Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(pubsub): check response of receipt modacks for exactly once delivery #7568

Merged
merged 9 commits into from
Mar 22, 2023
5 changes: 2 additions & 3 deletions pubsub/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2020,11 +2020,10 @@ func TestIntegration_ExactlyOnceDelivery_PublishReceive(t *testing.T) {

func TestIntegration_TopicUpdateSchema(t *testing.T) {
ctx := context.Background()
// TODO(hongalex): update these staging endpoints after schema evolution is GA.
c := integrationTestClient(ctx, t, option.WithEndpoint("staging-pubsub.sandbox.googleapis.com:443"))
c := integrationTestClient(ctx, t)
defer c.Close()

sc := integrationTestSchemaClient(ctx, t, option.WithEndpoint("staging-pubsub.sandbox.googleapis.com:443"))
sc := integrationTestSchemaClient(ctx, t)
defer sc.Close()

schemaContent, err := ioutil.ReadFile("testdata/schema/us-states.avsc")
Expand Down
86 changes: 63 additions & 23 deletions pubsub/iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,17 @@ func (it *messageIterator) receive(maxToPull int32) ([]*Message, error) {
// do a receipt mod-ack when streaming.
maxExt := time.Now().Add(it.po.maxExtension)
ackIDs := map[string]*AckResult{}
it.eoMu.RLock()
exactlyOnceDelivery := it.enableExactlyOnceDelivery
it.eoMu.RUnlock()
it.mu.Lock()

// pendingMessages maps ackID -> message, and is used
// only when exactly once delivery is enabled.
// At first, all messages are pending, and they
// are removed if the modack call fails. All other
// messages are returned to the client for processing.
pendingMessages := make(map[string]*ipubsub.Message)
for _, m := range msgs {
ackID := msgAckID(m)
addRecv(m.ID, ackID, now)
Expand All @@ -264,22 +274,52 @@ func (it *messageIterator) receive(maxToPull int32) ([]*Message, error) {
// possible if there are retries.
if _, ok := it.pendingNacks[ackID]; !ok {
// Don't use the message's AckResult here since these are only for receipt modacks.
// ModAckResults are transparent to the user anyway so these can automatically succeed.
// modack results are transparent to the user so these can automatically succeed unless
// exactly once is enabled.
// We can't use an empty AckResult here either since SetAckResult will try to
// close the channel without checking if it exists.
ackIDs[ackID] = newSuccessAckResult()
if !exactlyOnceDelivery {
ackIDs[ackID] = newSuccessAckResult()
} else {
ackIDs[ackID] = ipubsub.NewAckResult()
pendingMessages[ackID] = m
}
}
}
deadline := it.ackDeadline()
it.mu.Unlock()
go func() {
if len(ackIDs) > 0 {
// Don't check the return value of this since modacks are fire and forget,
// meaning errors should not be propagated to the client.
it.sendModAck(ackIDs, deadline)

if len(ackIDs) > 0 {
// When exactly once delivery is not enabled, modacks are fire and forget.
if !exactlyOnceDelivery {
go func() {
it.sendModAck(ackIDs, deadline, false)
}()
return msgs, nil
}
}()
return msgs, nil

// If exactly once is enabled, we should wait until modack responses are successes
// before attempting to process messages.
it.sendModAck(ackIDs, deadline, false)
for ackID, ar := range ackIDs {
ctx := context.Background()
_, err := ar.Get(ctx)
if err != nil {
delete(pendingMessages, ackID)
it.mu.Lock()
// Remove the message from lease management if modack fails here.
delete(it.keepAliveDeadlines, ackID)
it.mu.Unlock()
}
}
// Only return for processing messages that were successfully modack'ed.
v := make([]*ipubsub.Message, 0, len(pendingMessages))
for _, m := range pendingMessages {
v = append(v, m)
}
return v, nil
}
return nil, nil
}

// Get messages using the Pull RPC.
Expand Down Expand Up @@ -399,10 +439,10 @@ func (it *messageIterator) sender() {
}
if sendNacks {
// Nack indicated by modifying the deadline to zero.
it.sendModAck(nacks, 0)
it.sendModAck(nacks, 0, false)
}
if sendModAcks {
it.sendModAck(modAcks, dl)
it.sendModAck(modAcks, dl, true)
}
if sendPing {
it.pingStream()
Expand Down Expand Up @@ -479,7 +519,7 @@ func (it *messageIterator) sendAck(m map[string]*AckResult) {
// percentile in order to capture the highest amount of time necessary without
// considering 1% outliers. If the ModAck RPC fails and exactly once delivery is
// enabled, we retry it in a separate goroutine for a short duration.
func (it *messageIterator) sendModAck(m map[string]*AckResult, deadline time.Duration) {
func (it *messageIterator) sendModAck(m map[string]*AckResult, deadline time.Duration, logOnInvalid bool) {
deadlineSec := int32(deadline / time.Second)
ackIDs := make([]string, 0, len(m))
for k := range m {
Expand Down Expand Up @@ -517,7 +557,7 @@ func (it *messageIterator) sendModAck(m map[string]*AckResult, deadline time.Dur
if len(toRetry) > 0 {
// Retry modacks/nacks in a separate goroutine.
go func() {
it.retryModAcks(toRetry, deadlineSec)
it.retryModAcks(toRetry, deadlineSec, logOnInvalid)
}()
}
}
Expand Down Expand Up @@ -563,29 +603,29 @@ func (it *messageIterator) retryAcks(m map[string]*AckResult) {
// in it.sendModAck(), with a max of 2500 ackIDs. Modacks are retried up to 3 times
// since after that, the message will have expired. Nacks are retried up until the default
// deadline of 10 minutes.
func (it *messageIterator) retryModAcks(m map[string]*AckResult, deadlineSec int32) {
func (it *messageIterator) retryModAcks(m map[string]*AckResult, deadlineSec int32, logOnInvalid bool) {
bo := newExactlyOnceBackoff()
retryCount := 0
ctx, cancel := context.WithTimeout(context.Background(), exactlyOnceDeliveryRetryDeadline)
defer cancel()
for {
// If context is done, complete all remaining Nacks with DeadlineExceeded
// ModAcks are not exposed to the user so these don't need to be modified.
// If context is done, complete all AckResults with errors.
if ctx.Err() != nil {
if deadlineSec == 0 {
for _, r := range m {
ipubsub.SetAckResult(r, AcknowledgeStatusOther, ctx.Err())
}
for _, r := range m {
ipubsub.SetAckResult(r, AcknowledgeStatusOther, ctx.Err())
}
return
}
// Only retry modack requests up to 3 times.
if deadlineSec != 0 && retryCount > 3 {
ackIDs := make([]string, 0, len(m))
for k := range m {
for k, ar := range m {
ackIDs = append(ackIDs, k)
ipubsub.SetAckResult(ar, AcknowledgeStatusOther, errors.New("modack retry failed"))
}
if logOnInvalid {
log.Printf("automatic lease modack retry failed for following IDs: %v", ackIDs)
}
log.Printf("automatic lease modack retry failed for following IDs: %v", ackIDs)
return
}
// Don't need to split map since this is the retry function and
Expand Down Expand Up @@ -723,7 +763,7 @@ func extractMetadata(err error) (*status.Status, map[string]string) {
return nil, nil
}

// processResults processes AckResults by referring to errorStatus and errorsMap.
// processResults processes AckResults by referring to errorStatus and errorsByAckID.
// The errors returned by the server in `errorStatus` or in `errorsByAckID`
// are used to complete the AckResults in `ackResMap` (with a success
// or error) or to return requests for further retries.
Expand Down
34 changes: 7 additions & 27 deletions pubsub/subscription_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"context"
"errors"
"fmt"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -680,8 +679,8 @@ func TestExactlyOnceDelivery_NackSuccess(t *testing.T) {
}
}

func TestExactlyOnceDelivery_NackRetry_DeadlineExceeded(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
func TestExactlyOnceDelivery_ReceiptModackError(t *testing.T) {
ctx := context.Background()
srv := pstest.NewServer(pstest.WithErrorInjection("ModifyAckDeadline", codes.Internal, "internal error"))
client, err := NewClient(ctx, projName,
option.WithEndpoint(srv.Addr),
Expand All @@ -708,31 +707,12 @@ func TestExactlyOnceDelivery_NackRetry_DeadlineExceeded(t *testing.T) {
if _, err := r.Get(ctx); err != nil {
t.Fatalf("failed to publish message: %v", err)
}
s.ReceiveSettings.MaxExtensionPeriod = 1 * time.Minute

s.ReceiveSettings = ReceiveSettings{
NumGoroutines: 1,
// This needs to be greater than total deadline otherwise the message will be redelivered.
MinExtensionPeriod: 2 * time.Minute,
MaxExtensionPeriod: 2 * time.Minute,
}
ctx, cancel := context.WithTimeout(ctx, 20*time.Second)
defer cancel()
// Override the default timeout here so this test doesn't take 10 minutes.
exactlyOnceDeliveryRetryDeadline = 20 * time.Second
var once sync.Once
err = s.Receive(ctx, func(ctx context.Context, msg *Message) {
once.Do(func() {
ar := msg.NackWithResult()
s, err := ar.Get(ctx)
if s != AcknowledgeStatusOther {
t.Errorf("AckResult AckStatus got %v, want %v", s, AcknowledgeStatusOther)
}
wantErr := context.DeadlineExceeded
if !errors.Is(err, wantErr) {
t.Errorf("AckResult error\ngot %v\nwant %s", err, wantErr)
}
cancel()
})
s.Receive(ctx, func(ctx context.Context, msg *Message) {
t.Fatal("expected message to not have been delivered when exactly once enabled")
})
if err != nil {
t.Fatalf("s.Receive err: %v", err)
}
}