Skip to content

Commit

Permalink
kv: don't consider lease start time as closed timestamp
Browse files Browse the repository at this point in the history
Fixes #60929.
Relates to #61986.
Relates to #61989.

This commit fixes a closed timestamp violation that could allow a
value/intent write at a timestamp below a range's closed timestamp. This
could allow for serializability violations if it allowed a follower read
to miss a write and could lead to a panic in the rangefeed processor if
a rangefeed was watching at the right time, as we saw in #60929.

In #60929, we found that this bug was caused by a range merge and a
lease transfer racing in such a way that the closed timestamp could
later be violated by a write to the subsumed portion of the joint range.
The root cause of this was an opportunistic optimization made in 7037b54
to consider a range's lease start time as an input to its closed
timestamp computation. This optimization did not account for the
possibility of serving writes to a newly subsumed keyspace below a
range's lease start time if that keyspace was merged into a range under
its current lease and with a freeze time below the current lease start
time. This bug is fixed by removing the optimization, which was on its
way out to allow for #61986 anyway.

Note that removing this optimization does not break
`TestClosedTimestampCanServeThroughoutLeaseTransfer`, because the v2
closed timestamp system does not allow for closed timestamp regressions,
even across leaseholders. This was one of the many benefits of the new
system.
  • Loading branch information
nvanbenschoten committed Mar 26, 2021
1 parent 8b137b4 commit dd97120
Show file tree
Hide file tree
Showing 3 changed files with 124 additions and 10 deletions.
111 changes: 111 additions & 0 deletions pkg/kv/kvserver/client_merge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2000,6 +2000,117 @@ func TestStoreRangeMergeRHSLeaseTransfers(t *testing.T) {
require.NoError(t, <-mergeErr)
}

// TestStoreRangeMergeLHSLeaseTransfersAfterFreezeTime verifies that in cases
// where the lease start time on a LHS range is moved above the freeze time of a
// range merge, the combined range after the merge does not broadcast a closed
// timestamp that it then allows to be violated.
//
// This is a regression test for #60929. In that issue, which was discovered by
// kvnemesis, we found that a range merge and a lease transfer could race in
// such a way that the closed timestamp could later be violated by a write to
// the subsumed portion of the joint range. The root cause of this was an
// opportunistic optimization made in 7037b54 to consider a range's lease start
// time as an input to its closed timestamp computation. This optimization did
// not account for the possibility of serving writes to a newly subsumed
// keyspace below a range's lease start time if that keyspace was merged into a
// range under its current lease and with a freeze time below the current lease
// start time. This bug was fixed by removing the optimization, which was on its
// way out to allow for #61986 anyway.
func TestStoreRangeMergeLHSLeaseTransfersAfterFreezeTime(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

// Install a hook to control when the merge transaction subsumes the RHS.
// Put this in a sync.Once to ignore retries.
var once sync.Once
subsumeReceived := make(chan struct{})
finishSubsume := make(chan struct{})
testingResponseFilter := func(_ context.Context, ba roachpb.BatchRequest, _ *roachpb.BatchResponse) *roachpb.Error {
if ba.IsSingleSubsumeRequest() {
once.Do(func() {
subsumeReceived <- struct{}{}
<-finishSubsume
})
}
return nil
}

ctx := context.Background()
tc := testcluster.StartTestCluster(t, 2,
base.TestClusterArgs{
ReplicationMode: base.ReplicationManual,
ServerArgs: base.TestServerArgs{
Knobs: base.TestingKnobs{
Store: &kvserver.StoreTestingKnobs{
TestingResponseFilter: testingResponseFilter,
},
},
},
})
defer tc.Stopper().Stop(ctx)

// Create the ranges to be merged. Put both ranges on both stores. Give the
// first store the lease on the LHS and the second store the lease on the
// RHS. Before the merge completes, we'll transfer the LHS's lease to the
// second store so that the two leaseholders are collocated.
lhsDesc, rhsDesc, err := tc.Servers[0].ScratchRangeEx()
require.NoError(t, err)

tc.AddVotersOrFatal(t, lhsDesc.StartKey.AsRawKey(), tc.Target(1))
tc.AddVotersOrFatal(t, rhsDesc.StartKey.AsRawKey(), tc.Target(1))
tc.TransferRangeLeaseOrFatal(t, lhsDesc, tc.Target(0))
tc.TransferRangeLeaseOrFatal(t, rhsDesc, tc.Target(1))

// Launch the merge.
mergeErr := make(chan error, 1)
_ = tc.Stopper().RunAsyncTask(ctx, "merge", func(context.Context) {
args := adminMergeArgs(lhsDesc.StartKey.AsRawKey())
_, pErr := kv.SendWrapped(ctx, tc.Servers[0].DistSender(), args)
mergeErr <- pErr.GoError()
})

// Wait for the merge transaction to send its Subsume request. It won't
// be able to complete just yet, thanks to the hook we installed above.
<-subsumeReceived

// Transfer the lease on the LHS to the second store. Doing this will
// increase the lease start time on the LHS past the freeze time of the
// range merge.
if err := tc.TransferRangeLease(lhsDesc, tc.Target(1)); err != nil {
close(finishSubsume) // don't abandon merge
t.Fatalf(`could transfer lease for range %s error is %+v`, lhsDesc, err)
}

store1 := tc.GetFirstStoreFromServer(t, 1)
lhsLeaseholder := store1.LookupReplica(lhsDesc.StartKey)
testutils.SucceedsSoon(t, func() error {
// Wait for the new leaseholder to notice that it received the lease.
now := tc.Servers[1].Clock().NowAsClockTimestamp()
if !lhsLeaseholder.OwnsValidLease(ctx, now) {
return errors.New("not leaseholder")
}
return nil
})
lhsClosedTS, ok := lhsLeaseholder.MaxClosed(ctx)
require.True(t, ok)

// Finally, allow the merge to complete. It should complete successfully.
close(finishSubsume)
require.NoError(t, <-mergeErr)

// Attempt to write below the closed timestamp, to the subsumed keyspace.
// The write's timestamp should be forwarded to after the closed timestamp.
// If it is not, we have violated the closed timestamp's promise!
var ba roachpb.BatchRequest
ba.Timestamp = lhsClosedTS.Prev()
ba.RangeID = lhsDesc.RangeID
ba.Add(incrementArgs(rhsDesc.StartKey.AsRawKey().Next(), 1))
br, pErr := tc.Servers[1].DistSender().Send(ctx, ba)
require.Nil(t, pErr)
require.NotEqual(t, ba.Timestamp, br.Timestamp, "write timestamp not bumped")
require.True(t, lhsClosedTS.Less(br.Timestamp), "write timestamp not bumped above closed timestamp")
}

// TestStoreRangeMergeCheckConsistencyAfterSubsumption verifies the following:
// 1. While a range is subsumed, ComputeChecksum requests wait until the merge
// is complete before proceeding.
Expand Down
13 changes: 4 additions & 9 deletions pkg/kv/kvserver/closed_timestamp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -411,29 +411,24 @@ func TestClosedTimestampCantServeBasedOnUncertaintyLimit(t *testing.T) {
ctx := context.Background()
// Set up the target duration to be very long and rely on lease transfers to
// drive MaxClosed.
tc, db0, desc := setupClusterForClosedTSTesting(ctx, t, time.Hour, testingCloseFraction,
aggressiveResolvedTimestampClusterArgs, "cttest", "kv")
tc, db0, desc := setupClusterForClosedTSTesting(ctx, t, testingTargetDuration,
testingCloseFraction, aggressiveResolvedTimestampClusterArgs, "cttest", "kv")
defer tc.Stopper().Stop(ctx)
repls := replsForRange(ctx, t, tc, desc, numNodes)

if _, err := db0.Exec(`INSERT INTO cttest.kv VALUES(1, $1)`, "foo"); err != nil {
t.Fatal(err)
}

// Grab a timestamp before initiating a lease transfer, transfer the lease,
// then ensure that reads at that timestamp can occur from all the replicas.
// Verify that we can serve a follower read at a timestamp. Wait if necessary.
ts := tc.Server(0).Clock().Now()
lh := getCurrentLeaseholder(t, tc, desc)
target := pickRandomTarget(tc, lh, desc)
require.Nil(t, tc.TransferRangeLease(desc, target))
baRead := makeTxnReadBatchForDesc(desc, ts)
testutils.SucceedsSoon(t, func() error {
return verifyCanReadFromAllRepls(ctx, t, baRead, repls, expectRows(1))
})

// Update the batch to simulate a transaction that has a global uncertainty
// limit after the lease transfer. Keep its read timestamp from before the
// lease transfer.
// limit after the lease transfer. Keep its read timestamp the same.
baRead.Txn.GlobalUncertaintyLimit = tc.Server(0).Clock().Now().Add(time.Second.Nanoseconds(), 0)
// Send the request to all three replicas. One should succeed and
// the other two should return NotLeaseHolderErrors.
Expand Down
10 changes: 9 additions & 1 deletion pkg/kv/kvserver/replica_follower_read.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,9 +197,17 @@ func (r *Replica) maxClosedRLocked(
// Look at the legacy closed timestamp propagation mechanism.
maxClosed := r.store.cfg.ClosedTimestamp.Provider.MaxClosed(
lease.Replica.NodeID, r.RangeID, ctpb.Epoch(lease.Epoch), appliedLAI)
maxClosed.Forward(lease.Start.ToTimestamp())
maxClosed.Forward(initialMaxClosed)

// If the range has not upgraded to the new closed timestamp system,
// continue using the lease start time as an input to the range's closed
// timestamp. Otherwise, ignore it. We expect to delete this code soon, but
// we keep it around for now to avoid a regression in follower read
// availability in mixed v20.2/v21.1 clusters.
if replicaStateClosed.IsEmpty() {
maxClosed.Forward(lease.Start.ToTimestamp())
}

// Look at the "new" closed timestamp propagation mechanism.
maxClosed.Forward(replicaStateClosed)
maxClosed.Forward(sideTransportClosed)
Expand Down

0 comments on commit dd97120

Please sign in to comment.