Skip to content

Commit

Permalink
kvserver: fix write below closedts bug
Browse files Browse the repository at this point in the history
This patch fixes a bug in our closed timestamp management. This bug was
making it possible for a command to close a timestamp even though other
requests writing at lower timestamps are currently evaluating. The
problem was that we were assuming that, if a replica is proposing a new
lease, there can't be any requests in flight and every future write
evaluated on the range will wait for the new lease and the evaluate
above the lease start time. Based on that reasoning, the proposal buffer
was recording the lease start time as its assignedClosedTimestamp. This
was matching what it does for every write, where assignedClosedTimestamp
corresponds to the the closed timestamp carried by the command.

It turns out that the replica's reasoning was wrong. It is, in fact,
possible for writes to be evaluating on the range when the lease
acquisition is proposed. And these evaluations might be done at
timestamps below the would-be lease's start time. This happens when the
replica has already received a lease through a lease transfer. The
transfer must have applied after the previous lease expired and the
replica decided to start acquiring a new one.

This fixes one of the assertion failures seen in #62655.

Release note (bug fix): A bug leading to crashes with the message
"writing below closed ts" has been fixed.
  • Loading branch information
andreimatei committed Apr 14, 2021
1 parent 8b20bef commit 543e94e
Show file tree
Hide file tree
Showing 6 changed files with 313 additions and 8 deletions.
223 changes: 220 additions & 3 deletions pkg/kv/kvserver/replica_closedts_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ package kvserver_test
import (
"context"
"sync"
"sync/atomic"
"testing"

"github.com/cockroachdb/cockroach/pkg/base"
Expand All @@ -33,9 +34,9 @@ import (
)

// TestBumpSideTransportClosed tests the various states that a replica can find
// itself in when its TestBumpSideTransportClosed is called. It verifies that
// the method only returns successfully if it can bump its closed timestamp to
// the target.
// itself in when its BumpSideTransportClosed is called. It verifies that the
// method only returns successfully if it can bump its closed timestamp to the
// target.
func TestBumpSideTransportClosed(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
Expand Down Expand Up @@ -413,6 +414,222 @@ func TestBumpSideTransportClosed(t *testing.T) {
}
}

// Test that a lease proposal that gets rejected doesn't erroneously dictate the
// closed timestamp of further requests. If it would, then writes could violate
// that closed timestamp. The tricky scenario tested is the following:
//
// 1. A lease held by rep1 is getting close to its expiration.
// 2. Rep1 begins the process of transferring its lease to rep2 with a start time of 100.
// 3. The transfer goes slowly. From the perspective of rep2, the original lease
// expires, so it begins acquiring a new lease with a start time of 200. The
// lease acquisition is slow to propose.
// 4. The lease transfer finally applies. Rep2 is the new leaseholder and bumps
// its tscache to 100.
// 5. Two writes start evaluating on rep2 under the new lease. They bump their
// write timestamp to 100.
// 6. Rep2's lease acquisition from step 3 is proposed. Here's where the
// regression that this test is protecting against comes in: if rep2 was to
// mechanically bump its assignedClosedTimestamp to 200, that'd be incorrect
// because there are in-flight writes at 100. If those writes get proposed
// after the lease acquisition request, the second of them to get proposed
// would violate the closed time carried by the first (see below).
// 7. The lease acquisition gets rejected below Raft because the previous lease
// it asserts doesn't correspond to the lease that it applies under.
// 8. The two writes from step 5 are proposed. The closed timestamp that they
// each carry has a lower bound of rep2.assignedClosedTimestmap. If this was
// 200, then the second one would violate the closed timestamp carried by the
// first one - the first one says that 200 is closed, but then the second
// tries to write at 100. Note that the first write is OK writing at 100 even
// though it carries a closed timestmap of 200 - the closed timestamp carried
// by a command only binds future commands.
//
// The test simulates the scenario and verifies that we don't crash with a
// closed timestamp violation assertion. We avoid the violation because, in step
// 6, the lease proposal doesn't bump the assignedClosedTimestamp.
func TestRejectedLeaseDoesntDictateClosedTimestamp(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
ctx := context.Background()

// We're going to orchestrate the scenario by controlling the timing of the
// lease transfer, the lease acquisition and the writes. Note that we'll block
// the lease acquisition and the writes after they evaluate but before they
// get proposed, but we'll block the lease transfer when it's just about to be
// proposed, after it gets assigned the closed timestamp that it will carry.
// We want it to carry a relatively low closed timestamp, so we want its
// closed timestamp to be assigned before we bump the clock to expire the
// original lease.

// leaseTransferCh is used to block the lease transfer.
leaseTransferCh := make(chan struct{})
// leaseAcqCh is used to block the lease acquisition.
leaseAcqCh := make(chan struct{})
// writeCh is used to wait for the two writes to block.
writeCh := make(chan struct{})
// unblockWritesCh is used to unblock the two writes.
unblockWritesCh := make(chan struct{})
var writeKey1, writeKey2 atomic.Value
// Initialize the atomics do they get bound to a specific type.
writeKey1.Store(roachpb.Key{})
writeKey2.Store(roachpb.Key{})
var blockedRangeID int64
var trappedLeaseAcquisition int64

blockLeaseAcquisition := func(args kvserverbase.FilterArgs) {
blockedRID := roachpb.RangeID(atomic.LoadInt64(&blockedRangeID))
if _, ok := args.Req.(*roachpb.RequestLeaseRequest); !ok || args.Hdr.RangeID != blockedRID {
return
}
if atomic.LoadInt64(&trappedLeaseAcquisition) == 1 {
// We only trap the first lease acquisition request on the respective
// range.
return
}
atomic.StoreInt64(&trappedLeaseAcquisition, 1)
leaseAcqCh <- struct{}{}
<-leaseAcqCh
return
}

blockWrites := func(args kvserverbase.FilterArgs) {
wk1 := writeKey1.Load().(roachpb.Key)
wk2 := writeKey2.Load().(roachpb.Key)
if put, ok := args.Req.(*roachpb.PutRequest); ok && (put.Key.Equal(wk1) || put.Key.Equal(wk2)) {
writeCh <- struct{}{}
<-unblockWritesCh
}
}

blockTransfer := func(ctx context.Context, p *kvserver.ProposalData) {
blockedRID := roachpb.RangeID(atomic.LoadInt64(&blockedRangeID))
ba := p.Request
if ba.RangeID != blockedRID {
return
}
_, ok := p.Request.GetArg(roachpb.TransferLease)
if !ok {
return
}
leaseTransferCh <- struct{}{}
<-leaseTransferCh
}

manual := hlc.NewHybridManualClock()
tc := testcluster.StartTestCluster(t, 2, base.TestClusterArgs{
ReplicationMode: base.ReplicationManual,
ServerArgs: base.TestServerArgs{
Knobs: base.TestingKnobs{
Server: &server.TestingKnobs{
ClockSource: manual.UnixNano,
},
Store: &kvserver.StoreTestingKnobs{
DisableConsistencyQueue: true,
EvalKnobs: kvserverbase.BatchEvalTestingKnobs{
TestingPostEvalFilter: func(args kvserverbase.FilterArgs) *roachpb.Error {
blockWrites(args)
blockLeaseAcquisition(args)
return nil
},
},
TestingFlushFilter: blockTransfer,
},
},
}})
defer tc.Stopper().Stop(ctx)

// Upreplicate a range.
n1, n2 := tc.Servers[0], tc.Servers[1]
key := tc.ScratchRangeWithExpirationLease(t)
s1 := tc.GetFirstStoreFromServer(t, 0)
t1, t2 := tc.Target(0), tc.Target(1)
repl0 := s1.LookupReplica(keys.MustAddr(key))
desc := *repl0.Desc()
require.NotNil(t, repl0)
tc.AddVotersOrFatal(t, key, t2)
// Make sure the lease starts off on n1.
lease, _ /* now */, err := tc.FindRangeLease(desc, &t1 /* hint */)
require.NoError(t, err)
require.Equal(t, n1.NodeID(), lease.Replica.NodeID)

// Advance the time a bit. We'll then initiate a transfer, and we want the
// transferred lease to be valid for a while after the original lease expires.
manual.Pause()
remainingNanos := lease.GetExpiration().WallTime - manual.UnixNano()
// NOTE: We don't advance the clock past the mid-point of the lease, otherwise
// it gets extended.
pause1 := remainingNanos / 3
manual.Increment(pause1)

// Start a lease transfer from n1 to n2. We'll block the proposal of the transfer for a while.
atomic.StoreInt64(&blockedRangeID, int64(desc.RangeID))
transferErrCh := make(chan error)
go func() {
transferErrCh <- tc.TransferRangeLease(desc, t2)
}()
defer func() {
require.NoError(t, <-transferErrCh)
}()
// Wait for the lease transfer to evaluate and then block.
<-leaseTransferCh
// With the lease transfer still blocked, we now advance the clock beyond the
// original lease's expiration and we make n2 try to acquire a lease. This
// lease acquisition request will also be blocked.
manual.Increment(remainingNanos - pause1 + 1)
leaseAcqErrCh := make(chan error)
go func() {
_, err := n2.AcquireRangeLease(ctx, desc.RangeID)
leaseAcqErrCh <- err
}()
// Wait for the lease acquisition to be blocked.
select {
case <-leaseAcqCh:
case pErr := <-leaseAcqErrCh:
t.Fatalf("lease request unexpectedly finished. err: %v", pErr)
}
// Let the previously blocked transfer succeed. n2's lease acquisition remains
// blocked.
close(leaseTransferCh)
// Check that n2 has applied the lease transfer.
desc = *repl0.Desc()
lease, _ /* now */, err = tc.FindRangeLease(desc, &t1 /* hint */)
require.NoError(t, err)
require.Equal(t, n2.NodeID(), lease.Replica.NodeID)

// Now we send two writes. We'll block them after evaluation. Then we'll
// unblock the lease acquisition, let the respective command fail to apply,
// and then we'll unblock the writes.
err1 := make(chan error)
err2 := make(chan error)
go func() {
writeKey1.Store(key)
sender := n2.DB().NonTransactionalSender()
pArgs := putArgs(key, []byte("test val"))
_, pErr := kv.SendWrappedWith(ctx, sender, roachpb.Header{Timestamp: lease.Start.ToTimestamp()}, pArgs)
err1 <- pErr.GoError()
}()
go func() {
k := key.Next()
writeKey2.Store(k)
sender := n2.DB().NonTransactionalSender()
pArgs := putArgs(k, []byte("test val2"))
_, pErr := kv.SendWrappedWith(ctx, sender, roachpb.Header{Timestamp: lease.Start.ToTimestamp()}, pArgs)
err2 <- pErr.GoError()
}()
// Wait for the writes to evaluate and block before proposal.
<-writeCh
<-writeCh

// Unblock the lease acquisition.
close(leaseAcqCh)
require.Nil(t, <-leaseAcqErrCh)

// Now unblock the writes.
close(unblockWritesCh)
require.NoError(t, <-err1)
require.NoError(t, <-err2)
// Not crashing marks the success of this test.
}

// BenchmarkBumpSideTransportClosed measures the latency of a single call to
// (*Replica).BumpSideTransportClosed. The closed timestamp side-transport was
// designed with a performance expectation of this check taking no more than
Expand Down
48 changes: 43 additions & 5 deletions pkg/kv/kvserver/replica_proposal_buf.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,9 @@ type proposer interface {
prop *ProposalData,
redirectTo roachpb.ReplicaID,
)
// testingFlushFilter returns a callback installed by tests to trap commands
// being flushed. Returns nil if no callback has been installed.
testingFlushFilter() ProposalFlushFilter
}

// proposerRaft abstracts the propBuf's dependency on *raft.RawNode, to help
Expand Down Expand Up @@ -627,6 +630,9 @@ func (b *propBuf) FlushLockedWithRaftGroup(
continue
}
}
if fn := b.p.testingFlushFilter(); fn != nil {
fn(ctx, p)
}

// Coordinate proposing the command to etcd/raft.
if crt := p.command.ReplicatedEvalResult.ChangeReplicas; crt != nil {
Expand Down Expand Up @@ -726,6 +732,31 @@ func (b *propBuf) assignClosedTimestampToProposalLocked(
// For brand new leases, we close the lease start time. Since this proposing
// replica is not the leaseholder, the previous target is meaningless.
closedTSTarget = newLease.Start.ToTimestamp()
// We forward closedTSTarget to b.assignedClosedTimestamp. We surprisingly
// might have previously closed a timestamp above the lease start time -
// when we close timestamps in the future, then attempt to transfer our
// lease away (and thus proscribe it) but the transfer fails and we're now
// acquiring a new lease to replace the proscribed one.
//
// TODO(andrei,nvanbenschoten) test:
// - Acquire lease @ 1
// - Close future timestamp @ 3
// - Attempt to transfer lease @ 2
// - Reject
// - Reacquire lease @ 2
closedTSTarget.Forward(b.assignedClosedTimestamp)

// Note that we're not bumping b.assignedClosedTimestamp here (we're not
// calling forwardClosedTimestampLocked). Bumping it to the lease start time
// would (surprisingly?) be illegal: just because we're proposing a lease
// starting at timestamp 100, doesn't mean we're sure to not evaluate
// requests writing below 100. This can happen if a lease transfer has
// already applied while we were evaluating this lease request, and if we've
// already started evaluating writes under the transferred lease. Such a
// transfer can give us the lease starting at timestamp 50. If such a
// transfer applied, then our lease request that we're proposing now is sure
// to not apply. But if we were to bump b.assignedClosedTimestamp, the
// damage would be done. See TestRejectedLeaseDoesntDictateClosedTimestamp.
} else {
// Sanity check that this command is not violating the closed timestamp. It
// must be writing at a timestamp above assignedClosedTimestamp
Expand All @@ -751,15 +782,18 @@ func (b *propBuf) assignClosedTimestampToProposalLocked(
}
// We can't close timestamps above the current lease's expiration.
closedTSTarget.Backward(p.leaseStatus.ClosedTimestampUpperBound())

// We're about to close closedTSTarget. The propBuf needs to remember that
// in order for incoming requests to be bumped above it (through
// TrackEvaluatingRequest).
if !b.forwardClosedTimestampLocked(closedTSTarget) {
closedTSTarget = b.assignedClosedTimestamp
}
}

// We're about to close closedTSTarget. The propBuf needs to remember that in
// order for incoming requests to be bumped above it (through
// TrackEvaluatingRequest).
b.forwardClosedTimestampLocked(closedTSTarget)
// Fill in the closed ts in the proposal.
f := &b.tmpClosedTimestampFooter
f.ClosedTimestamp = b.assignedClosedTimestamp
f.ClosedTimestamp = closedTSTarget
footerLen := f.Size()
if log.ExpensiveLogEnabled(ctx, 4) {
log.VEventf(ctx, 4, "attaching closed timestamp %s to proposal %x", b.assignedClosedTimestamp, p.idKey)
Expand Down Expand Up @@ -1055,6 +1089,10 @@ func (rp *replicaProposer) registerProposalLocked(p *ProposalData) {
rp.mu.proposals[p.idKey] = p
}

func (rp *replicaProposer) testingFlushFilter() ProposalFlushFilter {
return rp.store.TestingKnobs().TestingFlushFilter
}

func (rp *replicaProposer) leaderStatusRLocked(raftGroup proposerRaft) rangeLeaderInfo {
r := (*Replica)(rp)

Expand Down
14 changes: 14 additions & 0 deletions pkg/kv/kvserver/replica_proposal_buf_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,10 @@ func (t *testProposer) rejectProposalWithRedirectLocked(
t.onRejectProposalWithRedirectLocked(prop, redirectTo)
}

func (t *testProposer) testingFlushFilter() ProposalFlushFilter {
return nil
}

// proposalCreator holds on to a lease and creates proposals using it.
type proposalCreator struct {
lease kvserverpb.LeaseStatus
Expand Down Expand Up @@ -761,6 +765,8 @@ func TestProposalBufferClosedTimestamp(t *testing.T) {
lease roachpb.Lease

expClosed hlc.Timestamp
// If not nil, b.assignedClosedTimestamp is checked against this.
expAssignedClosed *hlc.Timestamp
}{
{
name: "basic",
Expand Down Expand Up @@ -806,6 +812,11 @@ func TestProposalBufferClosedTimestamp(t *testing.T) {
leaseExp: expiredLeaseTimestamp,
rangePolicy: roachpb.LAG_BY_CLUSTER_SETTING,
expClosed: now.ToTimestamp(),
// Check that the lease proposal does not bump b.assignedClosedTimestamp.
// The proposer cannot make promises about the write timestamps of further
// requests based on the start time of a proposed lease. See comments in
// propBuf.assignClosedTimestampToProposalLocked().
expAssignedClosed: &hlc.Timestamp{},
},
{
name: "lease extension",
Expand Down Expand Up @@ -897,6 +908,9 @@ func TestProposalBufferClosedTimestamp(t *testing.T) {
require.NoError(t, err)
require.NoError(t, b.flushLocked(ctx))
checkClosedTS(t, r, tc.expClosed)
if tc.expAssignedClosed != nil {
require.Equal(t, *tc.expAssignedClosed, b.assignedClosedTimestamp)
}
})
}
}
Expand Down
Loading

0 comments on commit 543e94e

Please sign in to comment.