Skip to content

Commit

Permalink
kv: permit some non-transactional batches to perform follower reads
Browse files Browse the repository at this point in the history
Touches #67562.

This commit adds support for a subset of non-transactional batch
requests to perform follower reads. Specifically, it makes those that do
not rely on their timestamp being set from the server's clock eligible.

This condition is necessary because if a follower with a lagging clock
sets its timestamp and this then allows the follower to evaluate the
batch as a follower read, then the batch might miss past writes served
at higher timestamps on the leaseholder.
  • Loading branch information
nvanbenschoten committed Aug 2, 2021
1 parent 31af9e3 commit e024445
Show file tree
Hide file tree
Showing 7 changed files with 711 additions and 551 deletions.
2 changes: 1 addition & 1 deletion pkg/ccl/kvccl/kvfollowerreadsccl/followerreads.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ func canSendToFollower(
ba roachpb.BatchRequest,
) bool {
return kvserver.BatchCanBeEvaluatedOnFollower(ba) &&
closedTimestampLikelySufficient(st, clock, ctPolicy, ba.Txn.RequiredFrontier()) &&
closedTimestampLikelySufficient(st, clock, ctPolicy, ba.RequiredFrontier()) &&
// NOTE: this call can be expensive, so perform it last. See #62447.
checkFollowerReadsEnabled(clusterID, st)
}
Expand Down
79 changes: 77 additions & 2 deletions pkg/ccl/kvccl/kvfollowerreadsccl/followerreads_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,15 @@ func TestCanSendToFollower(t *testing.T) {
ba.Add(req)
return ba
}
withBatchTimestamp := func(ba roachpb.BatchRequest, ts hlc.Timestamp) roachpb.BatchRequest {
ba.Timestamp = ts
return ba
}
withServerSideBatchTimestamp := func(ba roachpb.BatchRequest, ts hlc.Timestamp) roachpb.BatchRequest {
ba = withBatchTimestamp(ba, ts)
ba.TimestampFromServerClock = true
return ba
}

testCases := []struct {
name string
Expand All @@ -120,10 +129,40 @@ func TestCanSendToFollower(t *testing.T) {
exp bool
}{
{
name: "non-txn batch",
name: "non-txn batch, without ts",
ba: batch(nil, &roachpb.GetRequest{}),
exp: false,
},
{
name: "stale non-txn batch",
ba: withBatchTimestamp(batch(nil, &roachpb.GetRequest{}), stale),
exp: true,
},
{
name: "current-time non-txn batch",
ba: withBatchTimestamp(batch(nil, &roachpb.GetRequest{}), current),
exp: false,
},
{
name: "future non-txn batch",
ba: withBatchTimestamp(batch(nil, &roachpb.GetRequest{}), future),
exp: false,
},
{
name: "stale non-txn batch, server-side ts",
ba: withServerSideBatchTimestamp(batch(nil, &roachpb.GetRequest{}), stale),
exp: false,
},
{
name: "current-time non-txn batch, server-side ts",
ba: withServerSideBatchTimestamp(batch(nil, &roachpb.GetRequest{}), current),
exp: false,
},
{
name: "future non-txn batch, server-side ts",
ba: withServerSideBatchTimestamp(batch(nil, &roachpb.GetRequest{}), future),
exp: false,
},
{
name: "stale read",
ba: batch(txn(stale), &roachpb.GetRequest{}),
Expand Down Expand Up @@ -181,11 +220,47 @@ func TestCanSendToFollower(t *testing.T) {
exp: false,
},
{
name: "non-txn batch, global reads policy",
name: "non-txn batch, without ts, global reads policy",
ba: batch(nil, &roachpb.GetRequest{}),
ctPolicy: roachpb.LEAD_FOR_GLOBAL_READS,
exp: false,
},
{
name: "stale non-txn batch, global reads policy",
ba: withBatchTimestamp(batch(nil, &roachpb.GetRequest{}), stale),
ctPolicy: roachpb.LEAD_FOR_GLOBAL_READS,
exp: true,
},
{
name: "current-time non-txn batch, global reads policy",
ba: withBatchTimestamp(batch(nil, &roachpb.GetRequest{}), current),
ctPolicy: roachpb.LEAD_FOR_GLOBAL_READS,
exp: true,
},
{
name: "future non-txn batch, global reads policy",
ba: withBatchTimestamp(batch(nil, &roachpb.GetRequest{}), future),
ctPolicy: roachpb.LEAD_FOR_GLOBAL_READS,
exp: false,
},
{
name: "stale non-txn batch, server-side ts, global reads policy",
ba: withServerSideBatchTimestamp(batch(nil, &roachpb.GetRequest{}), stale),
ctPolicy: roachpb.LEAD_FOR_GLOBAL_READS,
exp: false,
},
{
name: "current-time non-txn batch, server-side ts, global reads policy",
ba: withServerSideBatchTimestamp(batch(nil, &roachpb.GetRequest{}), current),
ctPolicy: roachpb.LEAD_FOR_GLOBAL_READS,
exp: false,
},
{
name: "future non-txn batch, server-side ts, global reads policy",
ba: withServerSideBatchTimestamp(batch(nil, &roachpb.GetRequest{}), future),
ctPolicy: roachpb.LEAD_FOR_GLOBAL_READS,
exp: false,
},
{
name: "stale read, global reads policy",
ba: batch(txn(stale), &roachpb.GetRequest{}),
Expand Down
48 changes: 30 additions & 18 deletions pkg/kv/kvserver/closed_timestamp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -537,27 +537,39 @@ func TestClosedTimestampCantServeForNonTransactionalBatch(t *testing.T) {
// drives up the test duration.
skip.UnderRace(t)

ctx := context.Background()
tc, db0, desc := setupClusterForClosedTSTesting(ctx, t, testingTargetDuration, testingCloseFraction, aggressiveResolvedTimestampClusterArgs, "cttest", "kv")
defer tc.Stopper().Stop(ctx)
repls := replsForRange(ctx, t, tc, desc, numNodes)
testutils.RunTrueAndFalse(t, "tsFromServer", func(t *testing.T, tsFromServer bool) {
ctx := context.Background()
tc, db0, desc := setupClusterForClosedTSTesting(ctx, t, testingTargetDuration,
testingCloseFraction, aggressiveResolvedTimestampClusterArgs, "cttest", "kv")
defer tc.Stopper().Stop(ctx)
repls := replsForRange(ctx, t, tc, desc, numNodes)

if _, err := db0.Exec(`INSERT INTO cttest.kv VALUES(1, $1)`, "foo"); err != nil {
t.Fatal(err)
}
if _, err := db0.Exec(`INSERT INTO cttest.kv VALUES(1, $1)`, "foo"); err != nil {
t.Fatal(err)
}

// Verify that we can serve a follower read at a timestamp with a
// transactional batch. Wait if necessary.
ts := tc.Server(0).Clock().Now()
baRead := makeTxnReadBatchForDesc(desc, ts)
testutils.SucceedsSoon(t, func() error {
return verifyCanReadFromAllRepls(ctx, t, baRead, repls, expectRows(1))
})
// Verify that we can serve a follower read at a timestamp with a
// transactional batch. Wait if necessary.
ts := tc.Server(0).Clock().Now()
baRead := makeTxnReadBatchForDesc(desc, ts)
testutils.SucceedsSoon(t, func() error {
return verifyCanReadFromAllRepls(ctx, t, baRead, repls, expectRows(1))
})

// Remove the transaction and send the request to all three replicas. One
// should succeed and the other two should return NotLeaseHolderErrors.
baRead.Txn = nil
verifyNotLeaseHolderErrors(t, baRead, repls, 2)
// Remove the transaction and send the request to all three replicas. If the
// batch indicates that the timestamp was set from the server's clock, then
// one should succeed and the other two should return NotLeaseHolderErrors.
// Otherwise, all three should succeed.
baRead.Txn = nil
if tsFromServer {
baRead.TimestampFromServerClock = true
verifyNotLeaseHolderErrors(t, baRead, repls, 2)
} else {
testutils.SucceedsSoon(t, func() error {
return verifyCanReadFromAllRepls(ctx, t, baRead, repls, expectRows(1))
})
}
})
}

// Test that, during a merge, the closed timestamp of the subsumed RHS doesn't
Expand Down
17 changes: 11 additions & 6 deletions pkg/kv/kvserver/replica_follower_read.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,18 +38,23 @@ var FollowerReadsEnabled = settings.RegisterBoolSetting(
// advanced closed timestamp.
func BatchCanBeEvaluatedOnFollower(ba roachpb.BatchRequest) bool {
// Explanation of conditions:
// 1. the batch needs to be part of a transaction, because non-transactional
// batches often rely on the server setting their timestamp. If a follower
// with a lagging clock sets their timestamp then they might miss past
// writes served at higher timestamps.
// 1. the batch cannot have or intend to receive a timestamp set from a
// server-side clock. If a follower with a lagging clock sets its timestamp
// and this then allows the follower to evaluate the batch as a follower
// read, then the batch might miss past writes served at higher timestamps
// on the leaseholder.
// 2. each request in the batch needs to be "transactional", because those are
// the only ones that have clearly defined semantics when served under the
// closed timestamp.
// 3. the batch needs to be read-only, because a follower replica cannot
// propose writes to Raft.
// 4. the batch needs to be non-locking, because unreplicated locks are only
// held on the leaseholder.
return ba.Txn != nil && ba.IsAllTransactional() && ba.IsReadOnly() && !ba.IsLocking()
tsFromServerClock := ba.Txn == nil && (ba.Timestamp.IsEmpty() || ba.TimestampFromServerClock)
if tsFromServerClock {
return false
}
return ba.IsAllTransactional() && ba.IsReadOnly() && !ba.IsLocking()
}

// canServeFollowerReadRLocked tests, when a range lease could not be acquired,
Expand Down Expand Up @@ -83,7 +88,7 @@ func (r *Replica) canServeFollowerReadRLocked(
return false
}

requiredFrontier := ba.Txn.RequiredFrontier()
requiredFrontier := ba.RequiredFrontier()
maxClosed, _ := r.maxClosedRLocked(ctx, requiredFrontier /* sufficient */)
canServeFollowerRead := requiredFrontier.LessEq(maxClosed)
tsDiff := requiredFrontier.GoTime().Sub(maxClosed.GoTime())
Expand Down
Loading

0 comments on commit e024445

Please sign in to comment.