Skip to content

Commit

Permalink
fix(pubsub): check response of receipt modacks for exactly once deliv…
Browse files Browse the repository at this point in the history
…ery (#7568)

* fix(pubsub): check response of receipt modacks for exactly once delivery

* document behavior of pendingMessages

* document behavior of pendingMessages

* remove defer for unlocking
  • Loading branch information
hongalex authored Mar 22, 2023
1 parent 3c27930 commit 94d0408
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 53 deletions.
5 changes: 2 additions & 3 deletions pubsub/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2020,11 +2020,10 @@ func TestIntegration_ExactlyOnceDelivery_PublishReceive(t *testing.T) {

func TestIntegration_TopicUpdateSchema(t *testing.T) {
ctx := context.Background()
// TODO(hongalex): update these staging endpoints after schema evolution is GA.
c := integrationTestClient(ctx, t, option.WithEndpoint("staging-pubsub.sandbox.googleapis.com:443"))
c := integrationTestClient(ctx, t)
defer c.Close()

sc := integrationTestSchemaClient(ctx, t, option.WithEndpoint("staging-pubsub.sandbox.googleapis.com:443"))
sc := integrationTestSchemaClient(ctx, t)
defer sc.Close()

schemaContent, err := ioutil.ReadFile("testdata/schema/us-states.avsc")
Expand Down
86 changes: 63 additions & 23 deletions pubsub/iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,17 @@ func (it *messageIterator) receive(maxToPull int32) ([]*Message, error) {
// do a receipt mod-ack when streaming.
maxExt := time.Now().Add(it.po.maxExtension)
ackIDs := map[string]*AckResult{}
it.eoMu.RLock()
exactlyOnceDelivery := it.enableExactlyOnceDelivery
it.eoMu.RUnlock()
it.mu.Lock()

// pendingMessages maps ackID -> message, and is used
// only when exactly once delivery is enabled.
// At first, all messages are pending, and they
// are removed if the modack call fails. All other
// messages are returned to the client for processing.
pendingMessages := make(map[string]*ipubsub.Message)
for _, m := range msgs {
ackID := msgAckID(m)
addRecv(m.ID, ackID, now)
Expand All @@ -264,22 +274,52 @@ func (it *messageIterator) receive(maxToPull int32) ([]*Message, error) {
// possible if there are retries.
if _, ok := it.pendingNacks[ackID]; !ok {
// Don't use the message's AckResult here since these are only for receipt modacks.
// ModAckResults are transparent to the user anyway so these can automatically succeed.
// modack results are transparent to the user so these can automatically succeed unless
// exactly once is enabled.
// We can't use an empty AckResult here either since SetAckResult will try to
// close the channel without checking if it exists.
ackIDs[ackID] = newSuccessAckResult()
if !exactlyOnceDelivery {
ackIDs[ackID] = newSuccessAckResult()
} else {
ackIDs[ackID] = ipubsub.NewAckResult()
pendingMessages[ackID] = m
}
}
}
deadline := it.ackDeadline()
it.mu.Unlock()
go func() {
if len(ackIDs) > 0 {
// Don't check the return value of this since modacks are fire and forget,
// meaning errors should not be propagated to the client.
it.sendModAck(ackIDs, deadline)

if len(ackIDs) > 0 {
// When exactly once delivery is not enabled, modacks are fire and forget.
if !exactlyOnceDelivery {
go func() {
it.sendModAck(ackIDs, deadline, false)
}()
return msgs, nil
}
}()
return msgs, nil

// If exactly once is enabled, we should wait until modack responses are successes
// before attempting to process messages.
it.sendModAck(ackIDs, deadline, false)
for ackID, ar := range ackIDs {
ctx := context.Background()
_, err := ar.Get(ctx)
if err != nil {
delete(pendingMessages, ackID)
it.mu.Lock()
// Remove the message from lease management if modack fails here.
delete(it.keepAliveDeadlines, ackID)
it.mu.Unlock()
}
}
// Only return for processing messages that were successfully modack'ed.
v := make([]*ipubsub.Message, 0, len(pendingMessages))
for _, m := range pendingMessages {
v = append(v, m)
}
return v, nil
}
return nil, nil
}

// Get messages using the Pull RPC.
Expand Down Expand Up @@ -399,10 +439,10 @@ func (it *messageIterator) sender() {
}
if sendNacks {
// Nack indicated by modifying the deadline to zero.
it.sendModAck(nacks, 0)
it.sendModAck(nacks, 0, false)
}
if sendModAcks {
it.sendModAck(modAcks, dl)
it.sendModAck(modAcks, dl, true)
}
if sendPing {
it.pingStream()
Expand Down Expand Up @@ -479,7 +519,7 @@ func (it *messageIterator) sendAck(m map[string]*AckResult) {
// percentile in order to capture the highest amount of time necessary without
// considering 1% outliers. If the ModAck RPC fails and exactly once delivery is
// enabled, we retry it in a separate goroutine for a short duration.
func (it *messageIterator) sendModAck(m map[string]*AckResult, deadline time.Duration) {
func (it *messageIterator) sendModAck(m map[string]*AckResult, deadline time.Duration, logOnInvalid bool) {
deadlineSec := int32(deadline / time.Second)
ackIDs := make([]string, 0, len(m))
for k := range m {
Expand Down Expand Up @@ -517,7 +557,7 @@ func (it *messageIterator) sendModAck(m map[string]*AckResult, deadline time.Dur
if len(toRetry) > 0 {
// Retry modacks/nacks in a separate goroutine.
go func() {
it.retryModAcks(toRetry, deadlineSec)
it.retryModAcks(toRetry, deadlineSec, logOnInvalid)
}()
}
}
Expand Down Expand Up @@ -563,29 +603,29 @@ func (it *messageIterator) retryAcks(m map[string]*AckResult) {
// in it.sendModAck(), with a max of 2500 ackIDs. Modacks are retried up to 3 times
// since after that, the message will have expired. Nacks are retried up until the default
// deadline of 10 minutes.
func (it *messageIterator) retryModAcks(m map[string]*AckResult, deadlineSec int32) {
func (it *messageIterator) retryModAcks(m map[string]*AckResult, deadlineSec int32, logOnInvalid bool) {
bo := newExactlyOnceBackoff()
retryCount := 0
ctx, cancel := context.WithTimeout(context.Background(), exactlyOnceDeliveryRetryDeadline)
defer cancel()
for {
// If context is done, complete all remaining Nacks with DeadlineExceeded
// ModAcks are not exposed to the user so these don't need to be modified.
// If context is done, complete all AckResults with errors.
if ctx.Err() != nil {
if deadlineSec == 0 {
for _, r := range m {
ipubsub.SetAckResult(r, AcknowledgeStatusOther, ctx.Err())
}
for _, r := range m {
ipubsub.SetAckResult(r, AcknowledgeStatusOther, ctx.Err())
}
return
}
// Only retry modack requests up to 3 times.
if deadlineSec != 0 && retryCount > 3 {
ackIDs := make([]string, 0, len(m))
for k := range m {
for k, ar := range m {
ackIDs = append(ackIDs, k)
ipubsub.SetAckResult(ar, AcknowledgeStatusOther, errors.New("modack retry failed"))
}
if logOnInvalid {
log.Printf("automatic lease modack retry failed for following IDs: %v", ackIDs)
}
log.Printf("automatic lease modack retry failed for following IDs: %v", ackIDs)
return
}
// Don't need to split map since this is the retry function and
Expand Down Expand Up @@ -723,7 +763,7 @@ func extractMetadata(err error) (*status.Status, map[string]string) {
return nil, nil
}

// processResults processes AckResults by referring to errorStatus and errorsMap.
// processResults processes AckResults by referring to errorStatus and errorsByAckID.
// The errors returned by the server in `errorStatus` or in `errorsByAckID`
// are used to complete the AckResults in `ackResMap` (with a success
// or error) or to return requests for further retries.
Expand Down
34 changes: 7 additions & 27 deletions pubsub/subscription_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"context"
"errors"
"fmt"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -680,8 +679,8 @@ func TestExactlyOnceDelivery_NackSuccess(t *testing.T) {
}
}

func TestExactlyOnceDelivery_NackRetry_DeadlineExceeded(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
func TestExactlyOnceDelivery_ReceiptModackError(t *testing.T) {
ctx := context.Background()
srv := pstest.NewServer(pstest.WithErrorInjection("ModifyAckDeadline", codes.Internal, "internal error"))
client, err := NewClient(ctx, projName,
option.WithEndpoint(srv.Addr),
Expand All @@ -708,31 +707,12 @@ func TestExactlyOnceDelivery_NackRetry_DeadlineExceeded(t *testing.T) {
if _, err := r.Get(ctx); err != nil {
t.Fatalf("failed to publish message: %v", err)
}
s.ReceiveSettings.MaxExtensionPeriod = 1 * time.Minute

s.ReceiveSettings = ReceiveSettings{
NumGoroutines: 1,
// This needs to be greater than total deadline otherwise the message will be redelivered.
MinExtensionPeriod: 2 * time.Minute,
MaxExtensionPeriod: 2 * time.Minute,
}
ctx, cancel := context.WithTimeout(ctx, 20*time.Second)
defer cancel()
// Override the default timeout here so this test doesn't take 10 minutes.
exactlyOnceDeliveryRetryDeadline = 20 * time.Second
var once sync.Once
err = s.Receive(ctx, func(ctx context.Context, msg *Message) {
once.Do(func() {
ar := msg.NackWithResult()
s, err := ar.Get(ctx)
if s != AcknowledgeStatusOther {
t.Errorf("AckResult AckStatus got %v, want %v", s, AcknowledgeStatusOther)
}
wantErr := context.DeadlineExceeded
if !errors.Is(err, wantErr) {
t.Errorf("AckResult error\ngot %v\nwant %s", err, wantErr)
}
cancel()
})
s.Receive(ctx, func(ctx context.Context, msg *Message) {
t.Fatal("expected message to not have been delivered when exactly once enabled")
})
if err != nil {
t.Fatalf("s.Receive err: %v", err)
}
}

0 comments on commit 94d0408

Please sign in to comment.