From 94d040898cc9e85fdac76560765b01cfd019d0b4 Mon Sep 17 00:00:00 2001 From: Alex Hong <9397363+hongalex@users.noreply.github.com> Date: Wed, 22 Mar 2023 15:16:57 -0700 Subject: [PATCH] fix(pubsub): check response of receipt modacks for exactly once delivery (#7568) * fix(pubsub): check response of receipt modacks for exactly once delivery * document behavior of pendingMessages * document behavior of pendingMessages * remove defer for unlocking --- pubsub/integration_test.go | 5 +-- pubsub/iterator.go | 86 +++++++++++++++++++++++++++---------- pubsub/subscription_test.go | 34 +++------------ 3 files changed, 72 insertions(+), 53 deletions(-) diff --git a/pubsub/integration_test.go b/pubsub/integration_test.go index 8b8b8ca3841f..595557add413 100644 --- a/pubsub/integration_test.go +++ b/pubsub/integration_test.go @@ -2020,11 +2020,10 @@ func TestIntegration_ExactlyOnceDelivery_PublishReceive(t *testing.T) { func TestIntegration_TopicUpdateSchema(t *testing.T) { ctx := context.Background() - // TODO(hongalex): update these staging endpoints after schema evolution is GA. - c := integrationTestClient(ctx, t, option.WithEndpoint("staging-pubsub.sandbox.googleapis.com:443")) + c := integrationTestClient(ctx, t) defer c.Close() - sc := integrationTestSchemaClient(ctx, t, option.WithEndpoint("staging-pubsub.sandbox.googleapis.com:443")) + sc := integrationTestSchemaClient(ctx, t) defer sc.Close() schemaContent, err := ioutil.ReadFile("testdata/schema/us-states.avsc") diff --git a/pubsub/iterator.go b/pubsub/iterator.go index 870e34dd7d59..f45f1b995a53 100644 --- a/pubsub/iterator.go +++ b/pubsub/iterator.go @@ -255,7 +255,17 @@ func (it *messageIterator) receive(maxToPull int32) ([]*Message, error) { // do a receipt mod-ack when streaming. maxExt := time.Now().Add(it.po.maxExtension) ackIDs := map[string]*AckResult{} + it.eoMu.RLock() + exactlyOnceDelivery := it.enableExactlyOnceDelivery + it.eoMu.RUnlock() it.mu.Lock() + + // pendingMessages maps ackID -> message, and is used + // only when exactly once delivery is enabled. + // At first, all messages are pending, and they + // are removed if the modack call fails. All other + // messages are returned to the client for processing. + pendingMessages := make(map[string]*ipubsub.Message) for _, m := range msgs { ackID := msgAckID(m) addRecv(m.ID, ackID, now) @@ -264,22 +274,52 @@ func (it *messageIterator) receive(maxToPull int32) ([]*Message, error) { // possible if there are retries. if _, ok := it.pendingNacks[ackID]; !ok { // Don't use the message's AckResult here since these are only for receipt modacks. - // ModAckResults are transparent to the user anyway so these can automatically succeed. + // modack results are transparent to the user so these can automatically succeed unless + // exactly once is enabled. // We can't use an empty AckResult here either since SetAckResult will try to // close the channel without checking if it exists. - ackIDs[ackID] = newSuccessAckResult() + if !exactlyOnceDelivery { + ackIDs[ackID] = newSuccessAckResult() + } else { + ackIDs[ackID] = ipubsub.NewAckResult() + pendingMessages[ackID] = m + } } } deadline := it.ackDeadline() it.mu.Unlock() - go func() { - if len(ackIDs) > 0 { - // Don't check the return value of this since modacks are fire and forget, - // meaning errors should not be propagated to the client. - it.sendModAck(ackIDs, deadline) + + if len(ackIDs) > 0 { + // When exactly once delivery is not enabled, modacks are fire and forget. + if !exactlyOnceDelivery { + go func() { + it.sendModAck(ackIDs, deadline, false) + }() + return msgs, nil } - }() - return msgs, nil + + // If exactly once is enabled, we should wait until modack responses are successes + // before attempting to process messages. + it.sendModAck(ackIDs, deadline, false) + for ackID, ar := range ackIDs { + ctx := context.Background() + _, err := ar.Get(ctx) + if err != nil { + delete(pendingMessages, ackID) + it.mu.Lock() + // Remove the message from lease management if modack fails here. + delete(it.keepAliveDeadlines, ackID) + it.mu.Unlock() + } + } + // Only return for processing messages that were successfully modack'ed. + v := make([]*ipubsub.Message, 0, len(pendingMessages)) + for _, m := range pendingMessages { + v = append(v, m) + } + return v, nil + } + return nil, nil } // Get messages using the Pull RPC. @@ -399,10 +439,10 @@ func (it *messageIterator) sender() { } if sendNacks { // Nack indicated by modifying the deadline to zero. - it.sendModAck(nacks, 0) + it.sendModAck(nacks, 0, false) } if sendModAcks { - it.sendModAck(modAcks, dl) + it.sendModAck(modAcks, dl, true) } if sendPing { it.pingStream() @@ -479,7 +519,7 @@ func (it *messageIterator) sendAck(m map[string]*AckResult) { // percentile in order to capture the highest amount of time necessary without // considering 1% outliers. If the ModAck RPC fails and exactly once delivery is // enabled, we retry it in a separate goroutine for a short duration. -func (it *messageIterator) sendModAck(m map[string]*AckResult, deadline time.Duration) { +func (it *messageIterator) sendModAck(m map[string]*AckResult, deadline time.Duration, logOnInvalid bool) { deadlineSec := int32(deadline / time.Second) ackIDs := make([]string, 0, len(m)) for k := range m { @@ -517,7 +557,7 @@ func (it *messageIterator) sendModAck(m map[string]*AckResult, deadline time.Dur if len(toRetry) > 0 { // Retry modacks/nacks in a separate goroutine. go func() { - it.retryModAcks(toRetry, deadlineSec) + it.retryModAcks(toRetry, deadlineSec, logOnInvalid) }() } } @@ -563,29 +603,29 @@ func (it *messageIterator) retryAcks(m map[string]*AckResult) { // in it.sendModAck(), with a max of 2500 ackIDs. Modacks are retried up to 3 times // since after that, the message will have expired. Nacks are retried up until the default // deadline of 10 minutes. -func (it *messageIterator) retryModAcks(m map[string]*AckResult, deadlineSec int32) { +func (it *messageIterator) retryModAcks(m map[string]*AckResult, deadlineSec int32, logOnInvalid bool) { bo := newExactlyOnceBackoff() retryCount := 0 ctx, cancel := context.WithTimeout(context.Background(), exactlyOnceDeliveryRetryDeadline) defer cancel() for { - // If context is done, complete all remaining Nacks with DeadlineExceeded - // ModAcks are not exposed to the user so these don't need to be modified. + // If context is done, complete all AckResults with errors. if ctx.Err() != nil { - if deadlineSec == 0 { - for _, r := range m { - ipubsub.SetAckResult(r, AcknowledgeStatusOther, ctx.Err()) - } + for _, r := range m { + ipubsub.SetAckResult(r, AcknowledgeStatusOther, ctx.Err()) } return } // Only retry modack requests up to 3 times. if deadlineSec != 0 && retryCount > 3 { ackIDs := make([]string, 0, len(m)) - for k := range m { + for k, ar := range m { ackIDs = append(ackIDs, k) + ipubsub.SetAckResult(ar, AcknowledgeStatusOther, errors.New("modack retry failed")) + } + if logOnInvalid { + log.Printf("automatic lease modack retry failed for following IDs: %v", ackIDs) } - log.Printf("automatic lease modack retry failed for following IDs: %v", ackIDs) return } // Don't need to split map since this is the retry function and @@ -723,7 +763,7 @@ func extractMetadata(err error) (*status.Status, map[string]string) { return nil, nil } -// processResults processes AckResults by referring to errorStatus and errorsMap. +// processResults processes AckResults by referring to errorStatus and errorsByAckID. // The errors returned by the server in `errorStatus` or in `errorsByAckID` // are used to complete the AckResults in `ackResMap` (with a success // or error) or to return requests for further retries. diff --git a/pubsub/subscription_test.go b/pubsub/subscription_test.go index ed8c865480a6..ef11dcde51c5 100644 --- a/pubsub/subscription_test.go +++ b/pubsub/subscription_test.go @@ -18,7 +18,6 @@ import ( "context" "errors" "fmt" - "sync" "testing" "time" @@ -680,8 +679,8 @@ func TestExactlyOnceDelivery_NackSuccess(t *testing.T) { } } -func TestExactlyOnceDelivery_NackRetry_DeadlineExceeded(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) +func TestExactlyOnceDelivery_ReceiptModackError(t *testing.T) { + ctx := context.Background() srv := pstest.NewServer(pstest.WithErrorInjection("ModifyAckDeadline", codes.Internal, "internal error")) client, err := NewClient(ctx, projName, option.WithEndpoint(srv.Addr), @@ -708,31 +707,12 @@ func TestExactlyOnceDelivery_NackRetry_DeadlineExceeded(t *testing.T) { if _, err := r.Get(ctx); err != nil { t.Fatalf("failed to publish message: %v", err) } + s.ReceiveSettings.MaxExtensionPeriod = 1 * time.Minute - s.ReceiveSettings = ReceiveSettings{ - NumGoroutines: 1, - // This needs to be greater than total deadline otherwise the message will be redelivered. - MinExtensionPeriod: 2 * time.Minute, - MaxExtensionPeriod: 2 * time.Minute, - } + ctx, cancel := context.WithTimeout(ctx, 20*time.Second) + defer cancel() // Override the default timeout here so this test doesn't take 10 minutes. - exactlyOnceDeliveryRetryDeadline = 20 * time.Second - var once sync.Once - err = s.Receive(ctx, func(ctx context.Context, msg *Message) { - once.Do(func() { - ar := msg.NackWithResult() - s, err := ar.Get(ctx) - if s != AcknowledgeStatusOther { - t.Errorf("AckResult AckStatus got %v, want %v", s, AcknowledgeStatusOther) - } - wantErr := context.DeadlineExceeded - if !errors.Is(err, wantErr) { - t.Errorf("AckResult error\ngot %v\nwant %s", err, wantErr) - } - cancel() - }) + s.Receive(ctx, func(ctx context.Context, msg *Message) { + t.Fatal("expected message to not have been delivered when exactly once enabled") }) - if err != nil { - t.Fatalf("s.Receive err: %v", err) - } }