diff --git a/pkg/kv/kvclient/kvcoord/txn_coord_sender_savepoints.go b/pkg/kv/kvclient/kvcoord/txn_coord_sender_savepoints.go index 9fab3b349f81..af84ff920636 100644 --- a/pkg/kv/kvclient/kvcoord/txn_coord_sender_savepoints.go +++ b/pkg/kv/kvclient/kvcoord/txn_coord_sender_savepoints.go @@ -43,6 +43,9 @@ type savepoint struct { seqNum enginepb.TxnSeq // txnSpanRefresher fields. + // TODO(mira): after we remove + // kv.transaction.keep_refresh_spans_on_savepoint_rollback.enabled, we won't + // need these two fields anymore. refreshSpans []roachpb.Span refreshInvalid bool } diff --git a/pkg/kv/kvclient/kvcoord/txn_coord_sender_test.go b/pkg/kv/kvclient/kvcoord/txn_coord_sender_test.go index 86c0d8aed0ad..f1c387cc0a65 100644 --- a/pkg/kv/kvclient/kvcoord/txn_coord_sender_test.go +++ b/pkg/kv/kvclient/kvcoord/txn_coord_sender_test.go @@ -2997,3 +2997,63 @@ func TestTxnSetIsoLevel(t *testing.T) { require.Equal(t, prev, txn.IsoLevel()) } } + +// TestRefreshWithSavepoint is an integration test that ensures the correct +// behavior of refreshes under savepoint rollback. The test sets up a write-skew +// example where txn1 reads keyA and writes to keyB, while concurrently txn2 +// reads keyB and writes to keyA. The two txns can't be serialized so one is +// expected to get a serialization error upon commit. +// +// However, with the old behavior of discarding refresh spans upon savepoint +// rollback, the read corresponding to the discarded refresh span is not +// refreshed, so the conflict goes unnoticed and both txns commit successfully. +// See #111228 for more details. +func TestRefreshWithSavepoint(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + testutils.RunTrueAndFalse(t, "keep-refresh-spans", func(t *testing.T, keepRefreshSpans bool) { + s, _, kvDB := serverutils.StartServer(t, base.TestServerArgs{}) + ctx := context.Background() + defer s.Stopper().Stop(context.Background()) + + if keepRefreshSpans { + kvcoord.KeepRefreshSpansOnSavepointRollback.Override(ctx, &s.ClusterSettings().SV, true) + } else { + kvcoord.KeepRefreshSpansOnSavepointRollback.Override(ctx, &s.ClusterSettings().SV, false) + } + + keyA := roachpb.Key("a") + keyB := roachpb.Key("b") + txn1 := kvDB.NewTxn(ctx, "txn1") + txn2 := kvDB.NewTxn(ctx, "txn2") + + spt1, err := txn1.CreateSavepoint(ctx) + require.NoError(t, err) + + _, err = txn1.Get(ctx, keyA) + require.NoError(t, err) + + err = txn1.RollbackToSavepoint(ctx, spt1) + require.NoError(t, err) + + _, err = txn2.Get(ctx, keyB) + require.NoError(t, err) + + err = txn1.Put(ctx, keyB, "bb") + require.NoError(t, err) + + err = txn2.Put(ctx, keyA, "aa") + require.NoError(t, err) + + err = txn1.Commit(ctx) + if keepRefreshSpans { + require.Regexp(t, ".*RETRY_SERIALIZABLE - failed preemptive refresh due to conflicting locks on \"a\"*", err) + } else { + require.NoError(t, err) + } + + err = txn2.Commit(ctx) + require.NoError(t, err) + }) +} diff --git a/pkg/kv/kvclient/kvcoord/txn_interceptor_span_refresher.go b/pkg/kv/kvclient/kvcoord/txn_interceptor_span_refresher.go index 5bbba69a2595..99a5cd70b035 100644 --- a/pkg/kv/kvclient/kvcoord/txn_interceptor_span_refresher.go +++ b/pkg/kv/kvclient/kvcoord/txn_interceptor_span_refresher.go @@ -44,6 +44,23 @@ var MaxTxnRefreshSpansBytes = settings.RegisterIntSetting( 1<<22, /* 4 MB */ settings.WithPublic) +// KeepRefreshSpansOnSavepointRollback is a boolean flag that, when enabled, +// ensures that all refresh spans accumulated since a savepoint was created are +// kept even after the savepoint is rolled back. This ensures that the reads +// corresponding to the refresh spans are serialized correctly, even though they +// were rolled back. See #111228 for more details. +// When set to true, this setting corresponds to the correct new behavior, +// which also matches the Postgres behavior. We don't expect this new behavior +// to impact customers because they should already be able to handle +// serialization errors; in case any unforeseen customer issues arise, the +// setting here allows us to revert to the old behavior. +// TODO(mira): set the default to true after #113765. +var KeepRefreshSpansOnSavepointRollback = settings.RegisterBoolSetting( + settings.SystemVisible, + "kv.transaction.keep_refresh_spans_on_savepoint_rollback.enabled", + "if enabled, all refresh spans accumulated since a savepoint was created are kept after the savepoint is rolled back", + false) + // txnSpanRefresher is a txnInterceptor that collects the read spans of a // serializable transaction in the event it gets a serializable retry error. It // can then use the set of read spans to avoid retrying the transaction if all @@ -794,6 +811,9 @@ func (sr *txnSpanRefresher) createSavepointLocked(ctx context.Context, s *savepo // TODO(nvanbenschoten): make sure this works correctly with ReadCommitted. // The refresh spans should either be empty when captured into a savepoint or // should be cleared when the savepoint is rolled back to. + // TODO(mira): after we remove + // kv.transaction.keep_refresh_spans_on_savepoint_rollback.enabled, we won't + // need to keep refresh spans in the savepoint anymore. s.refreshSpans = make([]roachpb.Span, len(sr.refreshFootprint.asSlice())) copy(s.refreshSpans, sr.refreshFootprint.asSlice()) s.refreshInvalid = sr.refreshInvalid @@ -801,9 +821,11 @@ func (sr *txnSpanRefresher) createSavepointLocked(ctx context.Context, s *savepo // rollbackToSavepointLocked is part of the txnInterceptor interface. func (sr *txnSpanRefresher) rollbackToSavepointLocked(ctx context.Context, s savepoint) { - sr.refreshFootprint.clear() - sr.refreshFootprint.insert(s.refreshSpans...) - sr.refreshInvalid = s.refreshInvalid + if !KeepRefreshSpansOnSavepointRollback.Get(&sr.st.SV) { + sr.refreshFootprint.clear() + sr.refreshFootprint.insert(s.refreshSpans...) + sr.refreshInvalid = s.refreshInvalid + } } // closeLocked implements the txnInterceptor interface. diff --git a/pkg/kv/kvclient/kvcoord/txn_interceptor_span_refresher_test.go b/pkg/kv/kvclient/kvcoord/txn_interceptor_span_refresher_test.go index 849d39d15035..18fd957f9c7a 100644 --- a/pkg/kv/kvclient/kvcoord/txn_interceptor_span_refresher_test.go +++ b/pkg/kv/kvclient/kvcoord/txn_interceptor_span_refresher_test.go @@ -1436,57 +1436,77 @@ func TestTxnSpanRefresherEpochIncrement(t *testing.T) { func TestTxnSpanRefresherSavepoint(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - ctx := context.Background() - tsr, mockSender := makeMockTxnSpanRefresher() - keyA, keyB := roachpb.Key("a"), roachpb.Key("b") - txn := makeTxnProto() + testutils.RunTrueAndFalse(t, "keep-refresh-spans", func(t *testing.T, keepRefreshSpans bool) { + ctx := context.Background() + tsr, mockSender := makeMockTxnSpanRefresher() - read := func(key roachpb.Key) { - ba := &kvpb.BatchRequest{} - ba.Header = kvpb.Header{Txn: &txn} - getArgs := kvpb.GetRequest{RequestHeader: kvpb.RequestHeader{Key: key}} - ba.Add(&getArgs) - mockSender.MockSend(func(ba *kvpb.BatchRequest) (*kvpb.BatchResponse, *kvpb.Error) { - require.Len(t, ba.Requests, 1) - require.IsType(t, &kvpb.GetRequest{}, ba.Requests[0].GetInner()) - - br := ba.CreateReply() - br.Txn = ba.Txn - return br, nil - }) - br, pErr := tsr.SendLocked(ctx, ba) - require.Nil(t, pErr) - require.NotNil(t, br) - } - read(keyA) - require.Equal(t, []roachpb.Span{{Key: keyA}}, tsr.refreshFootprint.asSlice()) + if keepRefreshSpans { + KeepRefreshSpansOnSavepointRollback.Override(ctx, &tsr.st.SV, true) + } else { + KeepRefreshSpansOnSavepointRollback.Override(ctx, &tsr.st.SV, false) + } - s := savepoint{} - tsr.createSavepointLocked(ctx, &s) + keyA, keyB := roachpb.Key("a"), roachpb.Key("b") + txn := makeTxnProto() - // Another read after the savepoint was created. - read(keyB) - require.Equal(t, []roachpb.Span{{Key: keyA}, {Key: keyB}}, tsr.refreshFootprint.asSlice()) + read := func(key roachpb.Key) { + ba := &kvpb.BatchRequest{} + ba.Header = kvpb.Header{Txn: &txn} + getArgs := kvpb.GetRequest{RequestHeader: kvpb.RequestHeader{Key: key}} + ba.Add(&getArgs) + mockSender.MockSend(func(ba *kvpb.BatchRequest) (*kvpb.BatchResponse, *kvpb.Error) { + require.Len(t, ba.Requests, 1) + require.IsType(t, &kvpb.GetRequest{}, ba.Requests[0].GetInner()) - require.Equal(t, []roachpb.Span{{Key: keyA}}, s.refreshSpans) - require.False(t, s.refreshInvalid) + br := ba.CreateReply() + br.Txn = ba.Txn + return br, nil + }) + br, pErr := tsr.SendLocked(ctx, ba) + require.Nil(t, pErr) + require.NotNil(t, br) + } + read(keyA) + require.Equal(t, []roachpb.Span{{Key: keyA}}, tsr.refreshFootprint.asSlice()) - // Rollback the savepoint and check that refresh spans were overwritten. - tsr.rollbackToSavepointLocked(ctx, s) - require.Equal(t, []roachpb.Span{{Key: keyA}}, tsr.refreshFootprint.asSlice()) + s := savepoint{} + tsr.createSavepointLocked(ctx, &s) - // Check that rolling back to the savepoint resets refreshInvalid. - tsr.refreshInvalid = true - tsr.rollbackToSavepointLocked(ctx, s) - require.False(t, tsr.refreshInvalid) + // Another read after the savepoint was created. + read(keyB) + require.Equal(t, []roachpb.Span{{Key: keyA}, {Key: keyB}}, tsr.refreshFootprint.asSlice()) - // Set refreshInvalid and then create a savepoint. - tsr.refreshInvalid = true - s = savepoint{} - tsr.createSavepointLocked(ctx, &s) - require.True(t, s.refreshInvalid) - // Rollback to the savepoint check that refreshes are still invalid. - tsr.rollbackToSavepointLocked(ctx, s) - require.True(t, tsr.refreshInvalid) + require.Equal(t, []roachpb.Span{{Key: keyA}}, s.refreshSpans) + require.False(t, s.refreshInvalid) + + // Rollback the savepoint. + tsr.rollbackToSavepointLocked(ctx, s) + if keepRefreshSpans { + // Check that refresh spans were kept as such. + require.Equal(t, []roachpb.Span{{Key: keyA}, {Key: keyB}}, tsr.refreshFootprint.asSlice()) + } else { + // Check that refresh spans were overwritten. + require.Equal(t, []roachpb.Span{{Key: keyA}}, tsr.refreshFootprint.asSlice()) + } + + tsr.refreshInvalid = true + tsr.rollbackToSavepointLocked(ctx, s) + if keepRefreshSpans { + // Check that rolling back to the savepoint keeps refreshInvalid as such. + require.True(t, tsr.refreshInvalid) + } else { + // Check that rolling back to the savepoint resets refreshInvalid. + require.False(t, tsr.refreshInvalid) + } + + // Set refreshInvalid and then create a savepoint. + tsr.refreshInvalid = true + s = savepoint{} + tsr.createSavepointLocked(ctx, &s) + require.True(t, s.refreshInvalid) + // Rollback to the savepoint check that refreshes are still invalid. + tsr.rollbackToSavepointLocked(ctx, s) + require.True(t, tsr.refreshInvalid) + }) } diff --git a/pkg/kv/kvpb/errors.go b/pkg/kv/kvpb/errors.go index 412d5a832ee8..2041456d0466 100644 --- a/pkg/kv/kvpb/errors.go +++ b/pkg/kv/kvpb/errors.go @@ -866,7 +866,7 @@ func (e *TransactionRetryError) SafeFormatError(p errors.Printer) (next error) { msg = redact.Sprintf(" - %s", e.ExtraMsg) } if e.ConflictingTxn != nil { - msg = redact.Sprintf(" %s - conflicting txn: meta={%s}", msg, e.ConflictingTxn.String()) + msg = redact.Sprintf("%s - conflicting txn: meta={%s}", msg, e.ConflictingTxn.String()) } p.Printf("TransactionRetryError: retry txn (%s%s)", redact.SafeString(TransactionRetryReason_name[int32(e.Reason)]), msg) return nil