Skip to content

Commit

Permalink
Merge #101973
Browse files Browse the repository at this point in the history
101973: kv/sql: re-teach IsSerializablePushAndRefreshNotPossible about isolation levels r=rafiss a=nvanbenschoten

Informs #100131.

This commit revives logic in `Txn.IsSerializablePushAndRefreshNotPossible` that looks at the isolation level of the transaction. This logic was removed in 39ba88b.

Release note: None

Co-authored-by: Nathan VanBenschoten <[email protected]>
  • Loading branch information
craig[bot] and nvanbenschoten committed May 1, 2023
2 parents 71cf9fe + a36f663 commit 0b269bc
Show file tree
Hide file tree
Showing 3 changed files with 68 additions and 31 deletions.
3 changes: 2 additions & 1 deletion pkg/kv/kvclient/kvcoord/txn_coord_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -1161,12 +1161,13 @@ func (tc *TxnCoordSender) IsSerializablePushAndRefreshNotPossible() bool {
tc.mu.Lock()
defer tc.mu.Unlock()

isTxnSerializable := tc.mu.txn.IsoLevel == isolation.Serializable
isTxnPushed := tc.mu.txn.WriteTimestamp != tc.mu.txn.ReadTimestamp
refreshAttemptNotPossible := tc.interceptorAlloc.txnSpanRefresher.refreshInvalid ||
tc.mu.txn.CommitTimestampFixed
// We check CommitTimestampFixed here because, if that's set, refreshing
// of reads is not performed.
return isTxnPushed && refreshAttemptNotPossible
return isTxnSerializable && isTxnPushed && refreshAttemptNotPossible
}

// Epoch is part of the kv.TxnSender interface.
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -740,6 +740,7 @@ go_test(
"//pkg/kv/kvclient/rangefeed",
"//pkg/kv/kvpb",
"//pkg/kv/kvserver",
"//pkg/kv/kvserver/concurrency/isolation",
"//pkg/kv/kvserver/kvserverbase",
"//pkg/kv/kvserver/protectedts",
"//pkg/multitenant/tenantcapabilities",
Expand Down
95 changes: 65 additions & 30 deletions pkg/sql/internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/isolation"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/security/username"
"github.com/cockroachdb/cockroach/pkg/server"
Expand Down Expand Up @@ -521,37 +522,71 @@ func TestInternalExecutorPushDetectionInTxn(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()
params, _ := tests.CreateTestServerParams()
si, _, db := serverutils.StartServer(t, params)
defer si.Stopper().Stop(ctx)
s := si.(*server.TestServer)

// Setup a pushed txn.
txn := db.NewTxn(ctx, "test")
keyA := roachpb.Key("a")
_, err := db.Get(ctx, keyA)
require.NoError(t, err)
require.NoError(t, txn.Put(ctx, keyA, "x"))
require.NotEqual(t, txn.ReadTimestamp(), txn.ProvisionalCommitTimestamp(), "expect txn wts to be pushed")

// Fix the txn's timestamp, such that
// txn.IsSerializablePushAndRefreshNotPossible() and the connExecutor is
// tempted to generate a retriable error eagerly.
txn.CommitTimestamp()
require.True(t, txn.IsSerializablePushAndRefreshNotPossible())

tr := s.Tracer()
execCtx, getRecAndFinish := tracing.ContextWithRecordingSpan(ctx, tr, "test-recording")
defer getRecAndFinish()
ie := s.InternalExecutor().(*sql.InternalExecutor)
_, err = ie.Exec(execCtx, "test", txn, "select 42")
require.NoError(t, err)
require.NoError(t, testutils.MatchInOrder(getRecAndFinish().String(),
"push detected for non-refreshable txn but auto-retry not possible"))
require.NotEqual(t, txn.ReadTimestamp(), txn.ProvisionalCommitTimestamp(), "expect txn wts to be pushed")
for _, tt := range []struct {
serializable bool
pushed bool
refreshable bool
exp bool
}{
{serializable: false, pushed: false, refreshable: false, exp: false},
{serializable: false, pushed: false, refreshable: true, exp: false},
{serializable: false, pushed: true, refreshable: false, exp: false},
{serializable: false, pushed: true, refreshable: true, exp: false},
{serializable: true, pushed: false, refreshable: false, exp: false},
{serializable: true, pushed: false, refreshable: true, exp: false},
{serializable: true, pushed: true, refreshable: false, exp: true},
{serializable: true, pushed: true, refreshable: true, exp: false},
{serializable: true, pushed: true, refreshable: true, exp: false},
} {
name := fmt.Sprintf("serializable=%t,pushed=%t,refreshable=%t",
tt.serializable, tt.pushed, tt.refreshable)
t.Run(name, func(t *testing.T) {
ctx := context.Background()
params, _ := tests.CreateTestServerParams()
si, _, db := serverutils.StartServer(t, params)
defer si.Stopper().Stop(ctx)
s := si.(*server.TestServer)

// Setup a txn.
txn := db.NewTxn(ctx, "test")
keyA := roachpb.Key("a")
if !tt.serializable {
require.NoError(t, txn.SetIsoLevel(isolation.Snapshot))
}
if tt.pushed {
// Read outside the txn.
_, err := db.Get(ctx, keyA)
require.NoError(t, err)
// Write to the same key inside the txn to push its write timestamp.
require.NoError(t, txn.Put(ctx, keyA, "x"))
require.NotEqual(t, txn.ReadTimestamp(), txn.ProvisionalCommitTimestamp(), "expect txn wts to be pushed")
}
if !tt.refreshable {
// Fix the txn's timestamp to prevent refreshes.
txn.CommitTimestamp()
}

require.NoError(t, txn.Rollback(ctx))
// Are txn.IsSerializablePushAndRefreshNotPossible() and the connExecutor
// tempted to generate a retriable error eagerly?
require.Equal(t, tt.exp, txn.IsSerializablePushAndRefreshNotPossible())
if !tt.exp {
// Test case no longer interesting.
return
}

tr := s.Tracer()
execCtx, getRecAndFinish := tracing.ContextWithRecordingSpan(ctx, tr, "test-recording")
defer getRecAndFinish()
ie := s.InternalExecutor().(*sql.InternalExecutor)
_, err := ie.Exec(execCtx, "test", txn, "select 42")
require.NoError(t, err)
require.NoError(t, testutils.MatchInOrder(getRecAndFinish().String(),
"push detected for non-refreshable txn but auto-retry not possible"))
require.NotEqual(t, txn.ReadTimestamp(), txn.ProvisionalCommitTimestamp(), "expect txn wts to be pushed")

require.NoError(t, txn.Rollback(ctx))
})
}
}

func TestInternalExecutorInLeafTxnDoesNotPanic(t *testing.T) {
Expand Down

0 comments on commit 0b269bc

Please sign in to comment.