From 467ae98aa8b11377e94cbb6a468564c187fa0f9f Mon Sep 17 00:00:00 2001 From: Andrei Matei Date: Wed, 14 Apr 2021 15:26:44 -0400 Subject: [PATCH] kvserver: fix write below closedts bug This patch fixes a bug in our closed timestamp management. This bug was making it possible for a command to close a timestamp even though other requests writing at lower timestamps are currently evaluating. The problem was that we were assuming that, if a replica is proposing a new lease, there can't be any requests in flight and every future write evaluated on the range will wait for the new lease and the evaluate above the lease start time. Based on that reasoning, the proposal buffer was recording the lease start time as its assignedClosedTimestamp. This was matching what it does for every write, where assignedClosedTimestamp corresponds to the the closed timestamp carried by the command. It turns out that the replica's reasoning was wrong. It is, in fact, possible for writes to be evaluating on the range when the lease acquisition is proposed. And these evaluations might be done at timestamps below the would-be lease's start time. This happens when the replica has already received a lease through a lease transfer. The transfer must have applied after the previous lease expired and the replica decided to start acquiring a new one. This fixes one of the assertion failures seen in #62655. Release note (bug fix): A bug leading to crashes with the message "writing below closed ts" has been fixed. --- pkg/kv/kvserver/helpers_test.go | 9 + pkg/kv/kvserver/replica_closedts_test.go | 243 ++++++++++++++++++- pkg/kv/kvserver/replica_init.go | 1 + pkg/kv/kvserver/replica_proposal_buf.go | 39 ++- pkg/kv/kvserver/replica_proposal_buf_test.go | 90 ++++--- pkg/kv/kvserver/testing_knobs.go | 5 + 6 files changed, 346 insertions(+), 41 deletions(-) diff --git a/pkg/kv/kvserver/helpers_test.go b/pkg/kv/kvserver/helpers_test.go index a32673dc1a39..eeea580b89a5 100644 --- a/pkg/kv/kvserver/helpers_test.go +++ b/pkg/kv/kvserver/helpers_test.go @@ -42,6 +42,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/randutil" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/errors" + "github.com/cockroachdb/logtags" "go.etcd.io/etcd/raft/v3" ) @@ -546,3 +547,11 @@ func WatchForDisappearingReplicas(t testing.TB, store *Store) { } } } + +// AcquireLease is redirectOnOrAcquireLease exposed for tests. +func (r *Replica) AcquireLease(ctx context.Context) (kvserverpb.LeaseStatus, error) { + ctx = r.AnnotateCtx(ctx) + ctx = logtags.AddTag(ctx, "lease-acq", nil) + l, pErr := r.redirectOnOrAcquireLease(ctx) + return l, pErr.GoError() +} diff --git a/pkg/kv/kvserver/replica_closedts_test.go b/pkg/kv/kvserver/replica_closedts_test.go index 87a0b7d95638..a68484da4668 100644 --- a/pkg/kv/kvserver/replica_closedts_test.go +++ b/pkg/kv/kvserver/replica_closedts_test.go @@ -13,6 +13,7 @@ package kvserver_test import ( "context" "sync" + "sync/atomic" "testing" "github.com/cockroachdb/cockroach/pkg/base" @@ -33,9 +34,9 @@ import ( ) // TestBumpSideTransportClosed tests the various states that a replica can find -// itself in when its TestBumpSideTransportClosed is called. It verifies that -// the method only returns successfully if it can bump its closed timestamp to -// the target. +// itself in when its BumpSideTransportClosed is called. It verifies that the +// method only returns successfully if it can bump its closed timestamp to the +// target. func TestBumpSideTransportClosed(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) @@ -413,6 +414,242 @@ func TestBumpSideTransportClosed(t *testing.T) { } } +// Test that a lease proposal that gets rejected doesn't erroneously dictate the +// closed timestamp of further requests. If it would, then writes could violate +// that closed timestamp. The tricky scenario tested is the following: +// +// 1. A lease held by rep1 is getting close to its expiration. +// 2. Rep1 begins the process of transferring its lease to rep2 with a start +// time of 100. +// 3. The transfer goes slowly. From the perspective of rep2, the original lease +// expires, so it begins acquiring a new lease with a start time of 200. The +// lease acquisition is slow to propose. +// 4. The lease transfer finally applies. Rep2 is the new leaseholder and bumps +// its tscache to 100. +// 5. Two writes start evaluating on rep2 under the new lease. They bump their +// write timestamp to 100,1. +// 6. Rep2's lease acquisition from step 3 is proposed. Here's where the +// regression that this test is protecting against comes in: if rep2 was to +// mechanically bump its assignedClosedTimestamp to 200, that'd be incorrect +// because there are in-flight writes at 100. If those writes get proposed +// after the lease acquisition request, the second of them to get proposed +// would violate the closed time carried by the first (see below). +// 7. The lease acquisition gets rejected below Raft because the previous lease +// it asserts doesn't correspond to the lease that it applies under. +// 8. The two writes from step 5 are proposed. The closed timestamp that they +// each carry has a lower bound of rep2.assignedClosedTimestmap. If this was +// 200, then the second one would violate the closed timestamp carried by the +// first one - the first one says that 200 is closed, but then the second +// tries to write at 100. Note that the first write is OK writing at 100 even +// though it carries a closed timestamp of 200 - the closed timestamp carried +// by a command only binds future commands. +// +// The test simulates the scenario and verifies that we don't crash with a +// closed timestamp violation assertion. We avoid the violation because, in step +// 6, the lease proposal doesn't bump the assignedClosedTimestamp. +func TestRejectedLeaseDoesntDictateClosedTimestamp(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + ctx := context.Background() + + // We're going to orchestrate the scenario by controlling the timing of the + // lease transfer, the lease acquisition and the writes. Note that we'll block + // the lease acquisition and the writes after they evaluate but before they + // get proposed, but we'll block the lease transfer when it's just about to be + // proposed, after it gets assigned the closed timestamp that it will carry. + // We want it to carry a relatively low closed timestamp, so we want its + // closed timestamp to be assigned before we bump the clock to expire the + // original lease. + + // leaseTransferCh is used to block the lease transfer. + leaseTransferCh := make(chan struct{}) + // leaseAcqCh is used to block the lease acquisition. + leaseAcqCh := make(chan struct{}) + // writeCh is used to wait for the two writes to block. + writeCh := make(chan struct{}) + // unblockWritesCh is used to unblock the two writes. + unblockWritesCh := make(chan struct{}) + var writeKey1, writeKey2 atomic.Value + // Initialize the atomics so they get bound to a specific type. + writeKey1.Store(roachpb.Key{}) + writeKey2.Store(roachpb.Key{}) + var blockedRangeID int64 + var trappedLeaseAcquisition int64 + + blockLeaseAcquisition := func(args kvserverbase.FilterArgs) { + blockedRID := roachpb.RangeID(atomic.LoadInt64(&blockedRangeID)) + leaseReq, ok := args.Req.(*roachpb.RequestLeaseRequest) + if !ok || args.Hdr.RangeID != blockedRID || leaseReq.Lease.Replica.NodeID != 2 { + return + } + if atomic.CompareAndSwapInt64(&trappedLeaseAcquisition, 0, 1) { + leaseAcqCh <- struct{}{} + <-leaseAcqCh + } + } + + blockWrites := func(args kvserverbase.FilterArgs) { + wk1 := writeKey1.Load().(roachpb.Key) + wk2 := writeKey2.Load().(roachpb.Key) + if put, ok := args.Req.(*roachpb.PutRequest); ok && (put.Key.Equal(wk1) || put.Key.Equal(wk2)) { + writeCh <- struct{}{} + <-unblockWritesCh + } + } + + blockTransfer := func(p *kvserver.ProposalData) { + blockedRID := roachpb.RangeID(atomic.LoadInt64(&blockedRangeID)) + ba := p.Request + if ba.RangeID != blockedRID { + return + } + _, ok := p.Request.GetArg(roachpb.TransferLease) + if !ok { + return + } + leaseTransferCh <- struct{}{} + <-leaseTransferCh + } + + manual := hlc.NewHybridManualClock() + tc := testcluster.StartTestCluster(t, 2, base.TestClusterArgs{ + ReplicationMode: base.ReplicationManual, + ServerArgs: base.TestServerArgs{ + Knobs: base.TestingKnobs{ + Server: &server.TestingKnobs{ + ClockSource: manual.UnixNano, + }, + Store: &kvserver.StoreTestingKnobs{ + DisableConsistencyQueue: true, + EvalKnobs: kvserverbase.BatchEvalTestingKnobs{ + TestingPostEvalFilter: func(args kvserverbase.FilterArgs) *roachpb.Error { + blockWrites(args) + blockLeaseAcquisition(args) + return nil + }, + }, + TestingProposalSubmitFilter: func(p *kvserver.ProposalData) (drop bool, _ error) { + blockTransfer(p) + return false, nil + }, + }, + }, + }}) + defer tc.Stopper().Stop(ctx) + + manual.Pause() + // Upreplicate a range. + n1, n2 := tc.Servers[0], tc.Servers[1] + // One of the filters hardcodes a node id. + require.Equal(t, roachpb.NodeID(2), n2.NodeID()) + key := tc.ScratchRangeWithExpirationLease(t) + s1 := tc.GetFirstStoreFromServer(t, 0) + t1, t2 := tc.Target(0), tc.Target(1) + repl0 := s1.LookupReplica(keys.MustAddr(key)) + desc := *repl0.Desc() + require.NotNil(t, repl0) + tc.AddVotersOrFatal(t, key, t2) + require.NoError(t, tc.WaitForVoters(key, t2)) + // Make sure the lease starts off on n1. + lease, _ /* now */, err := tc.FindRangeLease(desc, &t1 /* hint */) + require.NoError(t, err) + require.Equal(t, n1.NodeID(), lease.Replica.NodeID) + + // Advance the time a bit. We'll then initiate a transfer, and we want the + // transferred lease to be valid for a while after the original lease expires. + remainingNanos := lease.GetExpiration().WallTime - manual.UnixNano() + // NOTE: We don't advance the clock past the mid-point of the lease, otherwise + // it gets extended. + pause1 := remainingNanos / 3 + manual.Increment(pause1) + + // Start a lease transfer from n1 to n2. We'll block the proposal of the transfer for a while. + atomic.StoreInt64(&blockedRangeID, int64(desc.RangeID)) + transferErrCh := make(chan error) + go func() { + transferErrCh <- tc.TransferRangeLease(desc, t2) + }() + defer func() { + require.NoError(t, <-transferErrCh) + }() + // Wait for the lease transfer to evaluate and then block. + <-leaseTransferCh + // With the lease transfer still blocked, we now advance the clock beyond the + // original lease's expiration and we make n2 try to acquire a lease. This + // lease acquisition request will also be blocked. + manual.Increment(remainingNanos - pause1 + 1) + leaseAcqErrCh := make(chan error) + go func() { + r, _, err := n2.Stores().GetReplicaForRangeID(ctx, desc.RangeID) + if err != nil { + leaseAcqErrCh <- err + return + } + _, err = r.AcquireLease(ctx) + leaseAcqErrCh <- err + }() + // Wait for the lease acquisition to be blocked. + select { + case <-leaseAcqCh: + case err := <-leaseAcqErrCh: + t.Fatalf("lease request unexpectedly finished. err: %v", err) + } + // Let the previously blocked transfer succeed. n2's lease acquisition remains + // blocked. + close(leaseTransferCh) + // Wait until n2 has applied the lease transfer. + desc = *repl0.Desc() + testutils.SucceedsSoon(t, func() error { + li, _ /* now */, err := tc.FindRangeLeaseEx(ctx, desc, &t2 /* hint */) + if err != nil { + return err + } + lease = li.Current() + if !lease.OwnedBy(n2.GetFirstStoreID()) { + return errors.Errorf("n2 still unaware of its lease: %s", li.Current()) + } + return nil + }) + + // Now we send two writes. We'll block them after evaluation. Then we'll + // unblock the lease acquisition, let the respective command fail to apply, + // and then we'll unblock the writes. + err1 := make(chan error) + err2 := make(chan error) + go func() { + writeKey1.Store(key) + sender := n2.DB().NonTransactionalSender() + pArgs := putArgs(key, []byte("test val")) + _, pErr := kv.SendWrappedWith(ctx, sender, roachpb.Header{Timestamp: lease.Start.ToTimestamp()}, pArgs) + err1 <- pErr.GoError() + }() + go func() { + k := key.Next() + writeKey2.Store(k) + sender := n2.DB().NonTransactionalSender() + pArgs := putArgs(k, []byte("test val2")) + _, pErr := kv.SendWrappedWith(ctx, sender, roachpb.Header{Timestamp: lease.Start.ToTimestamp()}, pArgs) + err2 <- pErr.GoError() + }() + // Wait for the writes to evaluate and block before proposal. + <-writeCh + <-writeCh + + // Unblock the lease acquisition. + close(leaseAcqCh) + if err := <-leaseAcqErrCh; err != nil { + close(unblockWritesCh) + t.Fatal(err) + } + + // Now unblock the writes. + close(unblockWritesCh) + require.NoError(t, <-err1) + require.NoError(t, <-err2) + // Not crashing with a closed timestamp violation assertion marks the success + // of this test. +} + // BenchmarkBumpSideTransportClosed measures the latency of a single call to // (*Replica).BumpSideTransportClosed. The closed timestamp side-transport was // designed with a performance expectation of this check taking no more than diff --git a/pkg/kv/kvserver/replica_init.go b/pkg/kv/kvserver/replica_init.go index c64027597c26..203762749be1 100644 --- a/pkg/kv/kvserver/replica_init.go +++ b/pkg/kv/kvserver/replica_init.go @@ -99,6 +99,7 @@ func newUnloadedReplica( r.mu.proposalBuf.Init((*replicaProposer)(r), tracker.NewLockfreeTracker(), r.Clock(), r.ClusterSettings()) r.mu.proposalBuf.testing.allowLeaseProposalWhenNotLeader = store.cfg.TestingKnobs.AllowLeaseRequestProposalsWhenNotLeader r.mu.proposalBuf.testing.dontCloseTimestamps = store.cfg.TestingKnobs.DontCloseTimestamps + r.mu.proposalBuf.testing.submitProposalFilter = store.cfg.TestingKnobs.TestingProposalSubmitFilter if leaseHistoryMaxEntries > 0 { r.leaseHistory = newLeaseHistory() diff --git a/pkg/kv/kvserver/replica_proposal_buf.go b/pkg/kv/kvserver/replica_proposal_buf.go index 134b8b4e11c7..cccb15f2ada6 100644 --- a/pkg/kv/kvserver/replica_proposal_buf.go +++ b/pkg/kv/kvserver/replica_proposal_buf.go @@ -726,6 +726,32 @@ func (b *propBuf) assignClosedTimestampToProposalLocked( // For brand new leases, we close the lease start time. Since this proposing // replica is not the leaseholder, the previous target is meaningless. closedTSTarget = newLease.Start.ToTimestamp() + // We forward closedTSTarget to b.assignedClosedTimestamp. We surprisingly + // might have previously closed a timestamp above the lease start time - + // when we close timestamps in the future, then attempt to transfer our + // lease away (and thus proscribe it) but the transfer fails and we're now + // acquiring a new lease to replace the proscribed one. + // + // TODO(andrei,nvanbenschoten): add a test with scenario: + // - Acquire lease @ 1 + // - Close future timestamp @ 3 + // - Attempt to transfer lease @ 2 + // - Reject + // - Reacquire lease @ 2 + closedTSTarget.Forward(b.assignedClosedTimestamp) + + // Note that we're not bumping b.assignedClosedTimestamp here (we're not + // calling forwardClosedTimestampLocked). Bumping it to the lease start time + // would (surprisingly?) be illegal: just because we're proposing a lease + // starting at timestamp 100, doesn't mean we're sure to not be in the + // process of evaluating requests writing below 100. This can happen if a + // lease transfer has already applied while we were evaluating this lease + // request, and if we've already started evaluating writes under the + // transferred lease. Such a transfer can give us the lease starting at + // timestamp 50. If such a transfer applied, then our lease request that + // we're proposing now is sure to not apply. But if we were to bump + // b.assignedClosedTimestamp, the damage would be done. See + // TestRejectedLeaseDoesntDictateClosedTimestamp. } else { // Sanity check that this command is not violating the closed timestamp. It // must be writing at a timestamp above assignedClosedTimestamp @@ -752,15 +778,18 @@ func (b *propBuf) assignClosedTimestampToProposalLocked( } // We can't close timestamps above the current lease's expiration. closedTSTarget.Backward(p.leaseStatus.ClosedTimestampUpperBound()) + + // We're about to close closedTSTarget. The propBuf needs to remember that + // in order for incoming requests to be bumped above it (through + // TrackEvaluatingRequest). + if !b.forwardClosedTimestampLocked(closedTSTarget) { + closedTSTarget = b.assignedClosedTimestamp + } } - // We're about to close closedTSTarget. The propBuf needs to remember that in - // order for incoming requests to be bumped above it (through - // TrackEvaluatingRequest). - b.forwardClosedTimestampLocked(closedTSTarget) // Fill in the closed ts in the proposal. f := &b.tmpClosedTimestampFooter - f.ClosedTimestamp = b.assignedClosedTimestamp + f.ClosedTimestamp = closedTSTarget footerLen := f.Size() if log.ExpensiveLogEnabled(ctx, 4) { log.VEventf(ctx, 4, "attaching closed timestamp %s to proposal %x", b.assignedClosedTimestamp, p.idKey) diff --git a/pkg/kv/kvserver/replica_proposal_buf_test.go b/pkg/kv/kvserver/replica_proposal_buf_test.go index 7046a2a008c8..b032f44d9076 100644 --- a/pkg/kv/kvserver/replica_proposal_buf_test.go +++ b/pkg/kv/kvserver/replica_proposal_buf_test.go @@ -762,37 +762,48 @@ func TestProposalBufferClosedTimestamp(t *testing.T) { // proposed. lease roachpb.Lease + // expClosed is the expected closed timestamp carried by the proposal. Empty + // means the proposal is not expected to carry a closed timestamp update. expClosed hlc.Timestamp + // expAssignedClosedBumped, if set, means that the test expects + // b.assignedClosedTimestamp to be bumped before proposing. If not set, then + // the test expects b.assignedClosedTimestamp to be left at + // prevClosedTimestamp, regardless of whether the proposal carries a closed + // timestamp or not (expClosed). + expAssignedClosedBumped bool }{ { - name: "basic", - reqType: regularWrite, - trackerLowerBound: hlc.Timestamp{}, - leaseExp: hlc.MaxTimestamp, - rangePolicy: roachpb.LAG_BY_CLUSTER_SETTING, - prevClosedTimestamp: hlc.Timestamp{}, - expClosed: nowMinusClosedLag, + name: "basic", + reqType: regularWrite, + trackerLowerBound: hlc.Timestamp{}, + leaseExp: hlc.MaxTimestamp, + rangePolicy: roachpb.LAG_BY_CLUSTER_SETTING, + prevClosedTimestamp: hlc.Timestamp{}, + expClosed: nowMinusClosedLag, + expAssignedClosedBumped: true, }, { // The request tracker will prevent us from closing below its lower bound. - name: "not closing below evaluating requests", - reqType: regularWrite, - trackerLowerBound: nowMinusTwiceClosedLag, - leaseExp: hlc.MaxTimestamp, - rangePolicy: roachpb.LAG_BY_CLUSTER_SETTING, - prevClosedTimestamp: hlc.Timestamp{}, - expClosed: nowMinusTwiceClosedLag.FloorPrev(), + name: "not closing below evaluating requests", + reqType: regularWrite, + trackerLowerBound: nowMinusTwiceClosedLag, + leaseExp: hlc.MaxTimestamp, + rangePolicy: roachpb.LAG_BY_CLUSTER_SETTING, + prevClosedTimestamp: hlc.Timestamp{}, + expClosed: nowMinusTwiceClosedLag.FloorPrev(), + expAssignedClosedBumped: true, }, { // Like the basic test, except that we can't close timestamp below what // we've already closed previously. - name: "no regression", - reqType: regularWrite, - trackerLowerBound: hlc.Timestamp{}, - leaseExp: hlc.MaxTimestamp, - rangePolicy: roachpb.LAG_BY_CLUSTER_SETTING, - prevClosedTimestamp: someClosedTS, - expClosed: someClosedTS, + name: "no regression", + reqType: regularWrite, + trackerLowerBound: hlc.Timestamp{}, + leaseExp: hlc.MaxTimestamp, + rangePolicy: roachpb.LAG_BY_CLUSTER_SETTING, + prevClosedTimestamp: someClosedTS, + expClosed: someClosedTS, + expAssignedClosedBumped: false, }, { name: "brand new lease", @@ -808,6 +819,11 @@ func TestProposalBufferClosedTimestamp(t *testing.T) { leaseExp: expiredLeaseTimestamp, rangePolicy: roachpb.LAG_BY_CLUSTER_SETTING, expClosed: now.ToTimestamp(), + // Check that the lease proposal does not bump b.assignedClosedTimestamp. + // The proposer cannot make promises about the write timestamps of further + // requests based on the start time of a proposed lease. See comments in + // propBuf.assignClosedTimestampToProposalLocked(). + expAssignedClosedBumped: false, }, { name: "lease extension", @@ -824,7 +840,8 @@ func TestProposalBufferClosedTimestamp(t *testing.T) { rangePolicy: roachpb.LAG_BY_CLUSTER_SETTING, // Lease extensions don't carry closed timestamps because they don't get // MLAIs, and so they can be reordered. - expClosed: hlc.Timestamp{}, + expClosed: hlc.Timestamp{}, + expAssignedClosedBumped: false, }, { // Lease transfers behave just like regular writes. The lease start time @@ -835,21 +852,23 @@ func TestProposalBufferClosedTimestamp(t *testing.T) { Sequence: curLease.Sequence + 1, Start: now, }, - trackerLowerBound: hlc.Timestamp{}, - leaseExp: hlc.MaxTimestamp, - rangePolicy: roachpb.LAG_BY_CLUSTER_SETTING, - expClosed: nowMinusClosedLag, + trackerLowerBound: hlc.Timestamp{}, + leaseExp: hlc.MaxTimestamp, + rangePolicy: roachpb.LAG_BY_CLUSTER_SETTING, + expClosed: nowMinusClosedLag, + expAssignedClosedBumped: true, }, { // With the LEAD_FOR_GLOBAL_READS policy, we're expecting to close // timestamps in the future. - name: "global range", - reqType: regularWrite, - trackerLowerBound: hlc.Timestamp{}, - leaseExp: hlc.MaxTimestamp, - rangePolicy: roachpb.LEAD_FOR_GLOBAL_READS, - prevClosedTimestamp: hlc.Timestamp{}, - expClosed: nowPlusGlobalReadLead, + name: "global range", + reqType: regularWrite, + trackerLowerBound: hlc.Timestamp{}, + leaseExp: hlc.MaxTimestamp, + rangePolicy: roachpb.LEAD_FOR_GLOBAL_READS, + prevClosedTimestamp: hlc.Timestamp{}, + expClosed: nowPlusGlobalReadLead, + expAssignedClosedBumped: true, }, } { t.Run(tc.name, func(t *testing.T) { @@ -899,6 +918,11 @@ func TestProposalBufferClosedTimestamp(t *testing.T) { require.NoError(t, err) require.NoError(t, b.flushLocked(ctx)) checkClosedTS(t, r, tc.expClosed) + if tc.expAssignedClosedBumped { + require.Equal(t, tc.expClosed, b.assignedClosedTimestamp) + } else { + require.Equal(t, tc.prevClosedTimestamp, b.assignedClosedTimestamp) + } }) } } diff --git a/pkg/kv/kvserver/testing_knobs.go b/pkg/kv/kvserver/testing_knobs.go index 59378df61af4..811aa512980e 100644 --- a/pkg/kv/kvserver/testing_knobs.go +++ b/pkg/kv/kvserver/testing_knobs.go @@ -47,6 +47,11 @@ type StoreTestingKnobs struct { // TestingProposalFilter is called before proposing each command. TestingProposalFilter kvserverbase.ReplicaProposalFilter + // TestingProposalSubmitFilter can be used by tests to observe and optionally + // drop Raft proposals before they are handed to etcd/raft to begin the + // process of replication. Dropped proposals are still eligible to be + // reproposed due to ticks. + TestingProposalSubmitFilter func(*ProposalData) (drop bool, err error) // TestingApplyFilter is called before applying the results of a // command on each replica. If it returns an error, the command will