Skip to content

Commit

Permalink
kv/sql: re-teach IsSerializablePushAndRefreshNotPossible about isolat…
Browse files Browse the repository at this point in the history
…ion levels

Informs cockroachdb#100131.

This commit revives logic in `Txn.IsSerializablePushAndRefreshNotPossible`
that looks at the isolation level of the transaction. This logic was removed
in 39ba88b.

Release note: None
  • Loading branch information
nvanbenschoten committed Apr 26, 2023
1 parent c2ceeb7 commit a36f663
Show file tree
Hide file tree
Showing 3 changed files with 68 additions and 31 deletions.
3 changes: 2 additions & 1 deletion pkg/kv/kvclient/kvcoord/txn_coord_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -1165,12 +1165,13 @@ func (tc *TxnCoordSender) IsSerializablePushAndRefreshNotPossible() bool {
tc.mu.Lock()
defer tc.mu.Unlock()

isTxnSerializable := tc.mu.txn.IsoLevel == isolation.Serializable
isTxnPushed := tc.mu.txn.WriteTimestamp != tc.mu.txn.ReadTimestamp
refreshAttemptNotPossible := tc.interceptorAlloc.txnSpanRefresher.refreshInvalid ||
tc.mu.txn.CommitTimestampFixed
// We check CommitTimestampFixed here because, if that's set, refreshing
// of reads is not performed.
return isTxnPushed && refreshAttemptNotPossible
return isTxnSerializable && isTxnPushed && refreshAttemptNotPossible
}

// Epoch is part of the kv.TxnSender interface.
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -739,6 +739,7 @@ go_test(
"//pkg/kv/kvclient/rangefeed",
"//pkg/kv/kvpb",
"//pkg/kv/kvserver",
"//pkg/kv/kvserver/concurrency/isolation",
"//pkg/kv/kvserver/kvserverbase",
"//pkg/kv/kvserver/protectedts",
"//pkg/multitenant/tenantcapabilities",
Expand Down
95 changes: 65 additions & 30 deletions pkg/sql/internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/isolation"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/security/username"
"github.com/cockroachdb/cockroach/pkg/server"
Expand Down Expand Up @@ -521,37 +522,71 @@ func TestInternalExecutorPushDetectionInTxn(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()
params, _ := tests.CreateTestServerParams()
si, _, db := serverutils.StartServer(t, params)
defer si.Stopper().Stop(ctx)
s := si.(*server.TestServer)

// Setup a pushed txn.
txn := db.NewTxn(ctx, "test")
keyA := roachpb.Key("a")
_, err := db.Get(ctx, keyA)
require.NoError(t, err)
require.NoError(t, txn.Put(ctx, keyA, "x"))
require.NotEqual(t, txn.ReadTimestamp(), txn.ProvisionalCommitTimestamp(), "expect txn wts to be pushed")

// Fix the txn's timestamp, such that
// txn.IsSerializablePushAndRefreshNotPossible() and the connExecutor is
// tempted to generate a retriable error eagerly.
txn.CommitTimestamp()
require.True(t, txn.IsSerializablePushAndRefreshNotPossible())

tr := s.Tracer()
execCtx, getRecAndFinish := tracing.ContextWithRecordingSpan(ctx, tr, "test-recording")
defer getRecAndFinish()
ie := s.InternalExecutor().(*sql.InternalExecutor)
_, err = ie.Exec(execCtx, "test", txn, "select 42")
require.NoError(t, err)
require.NoError(t, testutils.MatchInOrder(getRecAndFinish().String(),
"push detected for non-refreshable txn but auto-retry not possible"))
require.NotEqual(t, txn.ReadTimestamp(), txn.ProvisionalCommitTimestamp(), "expect txn wts to be pushed")
for _, tt := range []struct {
serializable bool
pushed bool
refreshable bool
exp bool
}{
{serializable: false, pushed: false, refreshable: false, exp: false},
{serializable: false, pushed: false, refreshable: true, exp: false},
{serializable: false, pushed: true, refreshable: false, exp: false},
{serializable: false, pushed: true, refreshable: true, exp: false},
{serializable: true, pushed: false, refreshable: false, exp: false},
{serializable: true, pushed: false, refreshable: true, exp: false},
{serializable: true, pushed: true, refreshable: false, exp: true},
{serializable: true, pushed: true, refreshable: true, exp: false},
{serializable: true, pushed: true, refreshable: true, exp: false},
} {
name := fmt.Sprintf("serializable=%t,pushed=%t,refreshable=%t",
tt.serializable, tt.pushed, tt.refreshable)
t.Run(name, func(t *testing.T) {
ctx := context.Background()
params, _ := tests.CreateTestServerParams()
si, _, db := serverutils.StartServer(t, params)
defer si.Stopper().Stop(ctx)
s := si.(*server.TestServer)

// Setup a txn.
txn := db.NewTxn(ctx, "test")
keyA := roachpb.Key("a")
if !tt.serializable {
require.NoError(t, txn.SetIsoLevel(isolation.Snapshot))
}
if tt.pushed {
// Read outside the txn.
_, err := db.Get(ctx, keyA)
require.NoError(t, err)
// Write to the same key inside the txn to push its write timestamp.
require.NoError(t, txn.Put(ctx, keyA, "x"))
require.NotEqual(t, txn.ReadTimestamp(), txn.ProvisionalCommitTimestamp(), "expect txn wts to be pushed")
}
if !tt.refreshable {
// Fix the txn's timestamp to prevent refreshes.
txn.CommitTimestamp()
}

require.NoError(t, txn.Rollback(ctx))
// Are txn.IsSerializablePushAndRefreshNotPossible() and the connExecutor
// tempted to generate a retriable error eagerly?
require.Equal(t, tt.exp, txn.IsSerializablePushAndRefreshNotPossible())
if !tt.exp {
// Test case no longer interesting.
return
}

tr := s.Tracer()
execCtx, getRecAndFinish := tracing.ContextWithRecordingSpan(ctx, tr, "test-recording")
defer getRecAndFinish()
ie := s.InternalExecutor().(*sql.InternalExecutor)
_, err := ie.Exec(execCtx, "test", txn, "select 42")
require.NoError(t, err)
require.NoError(t, testutils.MatchInOrder(getRecAndFinish().String(),
"push detected for non-refreshable txn but auto-retry not possible"))
require.NotEqual(t, txn.ReadTimestamp(), txn.ProvisionalCommitTimestamp(), "expect txn wts to be pushed")

require.NoError(t, txn.Rollback(ctx))
})
}
}

func TestInternalExecutorInLeafTxnDoesNotPanic(t *testing.T) {
Expand Down

0 comments on commit a36f663

Please sign in to comment.