diff --git a/pkg/kv/kvclient/kvcoord/txn_coord_sender.go b/pkg/kv/kvclient/kvcoord/txn_coord_sender.go index 77f3908414bf..eaa6f0655daf 100644 --- a/pkg/kv/kvclient/kvcoord/txn_coord_sender.go +++ b/pkg/kv/kvclient/kvcoord/txn_coord_sender.go @@ -1165,12 +1165,13 @@ func (tc *TxnCoordSender) IsSerializablePushAndRefreshNotPossible() bool { tc.mu.Lock() defer tc.mu.Unlock() + isTxnSerializable := tc.mu.txn.IsoLevel == isolation.Serializable isTxnPushed := tc.mu.txn.WriteTimestamp != tc.mu.txn.ReadTimestamp refreshAttemptNotPossible := tc.interceptorAlloc.txnSpanRefresher.refreshInvalid || tc.mu.txn.CommitTimestampFixed // We check CommitTimestampFixed here because, if that's set, refreshing // of reads is not performed. - return isTxnPushed && refreshAttemptNotPossible + return isTxnSerializable && isTxnPushed && refreshAttemptNotPossible } // Epoch is part of the kv.TxnSender interface. diff --git a/pkg/sql/BUILD.bazel b/pkg/sql/BUILD.bazel index 9798e285b99d..801d4e892f1e 100644 --- a/pkg/sql/BUILD.bazel +++ b/pkg/sql/BUILD.bazel @@ -739,6 +739,7 @@ go_test( "//pkg/kv/kvclient/rangefeed", "//pkg/kv/kvpb", "//pkg/kv/kvserver", + "//pkg/kv/kvserver/concurrency/isolation", "//pkg/kv/kvserver/kvserverbase", "//pkg/kv/kvserver/protectedts", "//pkg/multitenant/tenantcapabilities", diff --git a/pkg/sql/internal_test.go b/pkg/sql/internal_test.go index acc8d22f129d..4002aa063c0c 100644 --- a/pkg/sql/internal_test.go +++ b/pkg/sql/internal_test.go @@ -19,6 +19,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/kv" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/isolation" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/security/username" "github.com/cockroachdb/cockroach/pkg/server" @@ -521,37 +522,71 @@ func TestInternalExecutorPushDetectionInTxn(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - ctx := context.Background() - params, _ := tests.CreateTestServerParams() - si, _, db := serverutils.StartServer(t, params) - defer si.Stopper().Stop(ctx) - s := si.(*server.TestServer) - - // Setup a pushed txn. - txn := db.NewTxn(ctx, "test") - keyA := roachpb.Key("a") - _, err := db.Get(ctx, keyA) - require.NoError(t, err) - require.NoError(t, txn.Put(ctx, keyA, "x")) - require.NotEqual(t, txn.ReadTimestamp(), txn.ProvisionalCommitTimestamp(), "expect txn wts to be pushed") - - // Fix the txn's timestamp, such that - // txn.IsSerializablePushAndRefreshNotPossible() and the connExecutor is - // tempted to generate a retriable error eagerly. - txn.CommitTimestamp() - require.True(t, txn.IsSerializablePushAndRefreshNotPossible()) - - tr := s.Tracer() - execCtx, getRecAndFinish := tracing.ContextWithRecordingSpan(ctx, tr, "test-recording") - defer getRecAndFinish() - ie := s.InternalExecutor().(*sql.InternalExecutor) - _, err = ie.Exec(execCtx, "test", txn, "select 42") - require.NoError(t, err) - require.NoError(t, testutils.MatchInOrder(getRecAndFinish().String(), - "push detected for non-refreshable txn but auto-retry not possible")) - require.NotEqual(t, txn.ReadTimestamp(), txn.ProvisionalCommitTimestamp(), "expect txn wts to be pushed") + for _, tt := range []struct { + serializable bool + pushed bool + refreshable bool + exp bool + }{ + {serializable: false, pushed: false, refreshable: false, exp: false}, + {serializable: false, pushed: false, refreshable: true, exp: false}, + {serializable: false, pushed: true, refreshable: false, exp: false}, + {serializable: false, pushed: true, refreshable: true, exp: false}, + {serializable: true, pushed: false, refreshable: false, exp: false}, + {serializable: true, pushed: false, refreshable: true, exp: false}, + {serializable: true, pushed: true, refreshable: false, exp: true}, + {serializable: true, pushed: true, refreshable: true, exp: false}, + {serializable: true, pushed: true, refreshable: true, exp: false}, + } { + name := fmt.Sprintf("serializable=%t,pushed=%t,refreshable=%t", + tt.serializable, tt.pushed, tt.refreshable) + t.Run(name, func(t *testing.T) { + ctx := context.Background() + params, _ := tests.CreateTestServerParams() + si, _, db := serverutils.StartServer(t, params) + defer si.Stopper().Stop(ctx) + s := si.(*server.TestServer) + + // Setup a txn. + txn := db.NewTxn(ctx, "test") + keyA := roachpb.Key("a") + if !tt.serializable { + require.NoError(t, txn.SetIsoLevel(isolation.Snapshot)) + } + if tt.pushed { + // Read outside the txn. + _, err := db.Get(ctx, keyA) + require.NoError(t, err) + // Write to the same key inside the txn to push its write timestamp. + require.NoError(t, txn.Put(ctx, keyA, "x")) + require.NotEqual(t, txn.ReadTimestamp(), txn.ProvisionalCommitTimestamp(), "expect txn wts to be pushed") + } + if !tt.refreshable { + // Fix the txn's timestamp to prevent refreshes. + txn.CommitTimestamp() + } - require.NoError(t, txn.Rollback(ctx)) + // Are txn.IsSerializablePushAndRefreshNotPossible() and the connExecutor + // tempted to generate a retriable error eagerly? + require.Equal(t, tt.exp, txn.IsSerializablePushAndRefreshNotPossible()) + if !tt.exp { + // Test case no longer interesting. + return + } + + tr := s.Tracer() + execCtx, getRecAndFinish := tracing.ContextWithRecordingSpan(ctx, tr, "test-recording") + defer getRecAndFinish() + ie := s.InternalExecutor().(*sql.InternalExecutor) + _, err := ie.Exec(execCtx, "test", txn, "select 42") + require.NoError(t, err) + require.NoError(t, testutils.MatchInOrder(getRecAndFinish().String(), + "push detected for non-refreshable txn but auto-retry not possible")) + require.NotEqual(t, txn.ReadTimestamp(), txn.ProvisionalCommitTimestamp(), "expect txn wts to be pushed") + + require.NoError(t, txn.Rollback(ctx)) + }) + } } func TestInternalExecutorInLeafTxnDoesNotPanic(t *testing.T) {