Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kv: permit some non-transactional batches to perform follower reads #68192

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/ccl/kvccl/kvfollowerreadsccl/followerreads.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ func canSendToFollower(
ba roachpb.BatchRequest,
) bool {
return kvserver.BatchCanBeEvaluatedOnFollower(ba) &&
closedTimestampLikelySufficient(st, clock, ctPolicy, ba.Txn.RequiredFrontier()) &&
closedTimestampLikelySufficient(st, clock, ctPolicy, ba.RequiredFrontier()) &&
// NOTE: this call can be expensive, so perform it last. See #62447.
checkFollowerReadsEnabled(clusterID, st)
}
Expand Down
79 changes: 77 additions & 2 deletions pkg/ccl/kvccl/kvfollowerreadsccl/followerreads_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,15 @@ func TestCanSendToFollower(t *testing.T) {
ba.Add(req)
return ba
}
withBatchTimestamp := func(ba roachpb.BatchRequest, ts hlc.Timestamp) roachpb.BatchRequest {
ba.Timestamp = ts
return ba
}
withServerSideBatchTimestamp := func(ba roachpb.BatchRequest, ts hlc.Timestamp) roachpb.BatchRequest {
ba = withBatchTimestamp(ba, ts)
ba.TimestampFromServerClock = true
return ba
}

testCases := []struct {
name string
Expand All @@ -120,10 +129,40 @@ func TestCanSendToFollower(t *testing.T) {
exp bool
}{
{
name: "non-txn batch",
name: "non-txn batch, without ts",
ba: batch(nil, &roachpb.GetRequest{}),
exp: false,
},
{
name: "stale non-txn batch",
ba: withBatchTimestamp(batch(nil, &roachpb.GetRequest{}), stale),
exp: true,
},
{
name: "current-time non-txn batch",
ba: withBatchTimestamp(batch(nil, &roachpb.GetRequest{}), current),
exp: false,
},
{
name: "future non-txn batch",
ba: withBatchTimestamp(batch(nil, &roachpb.GetRequest{}), future),
exp: false,
},
{
name: "stale non-txn batch, server-side ts",
ba: withServerSideBatchTimestamp(batch(nil, &roachpb.GetRequest{}), stale),
exp: false,
},
{
name: "current-time non-txn batch, server-side ts",
ba: withServerSideBatchTimestamp(batch(nil, &roachpb.GetRequest{}), current),
exp: false,
},
{
name: "future non-txn batch, server-side ts",
ba: withServerSideBatchTimestamp(batch(nil, &roachpb.GetRequest{}), future),
exp: false,
},
{
name: "stale read",
ba: batch(txn(stale), &roachpb.GetRequest{}),
Expand Down Expand Up @@ -181,11 +220,47 @@ func TestCanSendToFollower(t *testing.T) {
exp: false,
},
{
name: "non-txn batch, global reads policy",
name: "non-txn batch, without ts, global reads policy",
ba: batch(nil, &roachpb.GetRequest{}),
ctPolicy: roachpb.LEAD_FOR_GLOBAL_READS,
exp: false,
},
{
name: "stale non-txn batch, global reads policy",
ba: withBatchTimestamp(batch(nil, &roachpb.GetRequest{}), stale),
ctPolicy: roachpb.LEAD_FOR_GLOBAL_READS,
exp: true,
},
{
name: "current-time non-txn batch, global reads policy",
ba: withBatchTimestamp(batch(nil, &roachpb.GetRequest{}), current),
ctPolicy: roachpb.LEAD_FOR_GLOBAL_READS,
exp: true,
},
{
name: "future non-txn batch, global reads policy",
ba: withBatchTimestamp(batch(nil, &roachpb.GetRequest{}), future),
ctPolicy: roachpb.LEAD_FOR_GLOBAL_READS,
exp: false,
},
{
name: "stale non-txn batch, server-side ts, global reads policy",
ba: withServerSideBatchTimestamp(batch(nil, &roachpb.GetRequest{}), stale),
ctPolicy: roachpb.LEAD_FOR_GLOBAL_READS,
exp: false,
},
{
name: "current-time non-txn batch, server-side ts, global reads policy",
ba: withServerSideBatchTimestamp(batch(nil, &roachpb.GetRequest{}), current),
ctPolicy: roachpb.LEAD_FOR_GLOBAL_READS,
exp: false,
},
{
name: "future non-txn batch, server-side ts, global reads policy",
ba: withServerSideBatchTimestamp(batch(nil, &roachpb.GetRequest{}), future),
ctPolicy: roachpb.LEAD_FOR_GLOBAL_READS,
exp: false,
},
{
name: "stale read, global reads policy",
ba: batch(txn(stale), &roachpb.GetRequest{}),
Expand Down
48 changes: 30 additions & 18 deletions pkg/kv/kvserver/closed_timestamp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -537,27 +537,39 @@ func TestClosedTimestampCantServeForNonTransactionalBatch(t *testing.T) {
// drives up the test duration.
skip.UnderRace(t)

ctx := context.Background()
tc, db0, desc := setupClusterForClosedTSTesting(ctx, t, testingTargetDuration, testingCloseFraction, aggressiveResolvedTimestampClusterArgs, "cttest", "kv")
defer tc.Stopper().Stop(ctx)
repls := replsForRange(ctx, t, tc, desc, numNodes)
testutils.RunTrueAndFalse(t, "tsFromServer", func(t *testing.T, tsFromServer bool) {
ctx := context.Background()
tc, db0, desc := setupClusterForClosedTSTesting(ctx, t, testingTargetDuration,
testingCloseFraction, aggressiveResolvedTimestampClusterArgs, "cttest", "kv")
defer tc.Stopper().Stop(ctx)
repls := replsForRange(ctx, t, tc, desc, numNodes)

if _, err := db0.Exec(`INSERT INTO cttest.kv VALUES(1, $1)`, "foo"); err != nil {
t.Fatal(err)
}
if _, err := db0.Exec(`INSERT INTO cttest.kv VALUES(1, $1)`, "foo"); err != nil {
t.Fatal(err)
}

// Verify that we can serve a follower read at a timestamp with a
// transactional batch. Wait if necessary.
ts := tc.Server(0).Clock().Now()
baRead := makeTxnReadBatchForDesc(desc, ts)
testutils.SucceedsSoon(t, func() error {
return verifyCanReadFromAllRepls(ctx, t, baRead, repls, expectRows(1))
})
// Verify that we can serve a follower read at a timestamp with a
// transactional batch. Wait if necessary.
ts := tc.Server(0).Clock().Now()
baRead := makeTxnReadBatchForDesc(desc, ts)
testutils.SucceedsSoon(t, func() error {
return verifyCanReadFromAllRepls(ctx, t, baRead, repls, expectRows(1))
})

// Remove the transaction and send the request to all three replicas. One
// should succeed and the other two should return NotLeaseHolderErrors.
baRead.Txn = nil
verifyNotLeaseHolderErrors(t, baRead, repls, 2)
// Remove the transaction and send the request to all three replicas. If the
// batch indicates that the timestamp was set from the server's clock, then
// one should succeed and the other two should return NotLeaseHolderErrors.
// Otherwise, all three should succeed.
baRead.Txn = nil
if tsFromServer {
baRead.TimestampFromServerClock = true
verifyNotLeaseHolderErrors(t, baRead, repls, 2)
} else {
testutils.SucceedsSoon(t, func() error {
return verifyCanReadFromAllRepls(ctx, t, baRead, repls, expectRows(1))
})
}
})
}

// Test that, during a merge, the closed timestamp of the subsumed RHS doesn't
Expand Down
17 changes: 11 additions & 6 deletions pkg/kv/kvserver/replica_follower_read.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,18 +38,23 @@ var FollowerReadsEnabled = settings.RegisterBoolSetting(
// advanced closed timestamp.
func BatchCanBeEvaluatedOnFollower(ba roachpb.BatchRequest) bool {
// Explanation of conditions:
// 1. the batch needs to be part of a transaction, because non-transactional
// batches often rely on the server setting their timestamp. If a follower
// with a lagging clock sets their timestamp then they might miss past
// writes served at higher timestamps.
// 1. the batch cannot have or intend to receive a timestamp set from a
// server-side clock. If a follower with a lagging clock sets its timestamp
// and this then allows the follower to evaluate the batch as a follower
// read, then the batch might miss past writes served at higher timestamps
// on the leaseholder.
// 2. each request in the batch needs to be "transactional", because those are
// the only ones that have clearly defined semantics when served under the
// closed timestamp.
// 3. the batch needs to be read-only, because a follower replica cannot
// propose writes to Raft.
// 4. the batch needs to be non-locking, because unreplicated locks are only
// held on the leaseholder.
return ba.Txn != nil && ba.IsAllTransactional() && ba.IsReadOnly() && !ba.IsLocking()
tsFromServerClock := ba.Txn == nil && (ba.Timestamp.IsEmpty() || ba.TimestampFromServerClock)
if tsFromServerClock {
return false
}
return ba.IsAllTransactional() && ba.IsReadOnly() && !ba.IsLocking()
}

// canServeFollowerReadRLocked tests, when a range lease could not be acquired,
Expand Down Expand Up @@ -83,7 +88,7 @@ func (r *Replica) canServeFollowerReadRLocked(
return false
}

requiredFrontier := ba.Txn.RequiredFrontier()
requiredFrontier := ba.RequiredFrontier()
maxClosed, _ := r.maxClosedRLocked(ctx, requiredFrontier /* sufficient */)
canServeFollowerRead := requiredFrontier.LessEq(maxClosed)
tsDiff := requiredFrontier.GoTime().Sub(maxClosed.GoTime())
Expand Down
Loading