Skip to content

Commit

Permalink
kvserver: don't allow raft forwarding of lease requests
Browse files Browse the repository at this point in the history
This patch aims to improve the behavior in scenarios where a follower replica is
behind, unaware of the latest lease, and it tries to acquire a lease in
its ignorant state. That lease acquisition request is bound to fail
(because the lease that it's based on is stale), but while it fails (or,
rather, until the behind replica finds out that it failed) local
requests are blocked. This blocking can last for a very long time in
situations where a snapshot is needed to catch up the follower, and the
snapshot is queued up behind many other snapshots (e.g. after a node has
been down for a while and gets restarted).

This patch tries an opinionated solution: never allow followers to
acquire leases. If there is a leader, it's a better idea for the leader
to acquire the lease. The leader might have a lease anyway or, even if
it doesn't, having the leader acquire it saves a leadership transfer
(leadership follows the lease).
We change the proposal path to recognize lease requests and reject them
early if the current replica is a follower and the leader is known. The
rejection points to the leader, which causes the request that triggered
the lease acquisition to make its way to the leader and attempt to
acquire a lease over there.

The next commit takes this further by short-circuiting the lease
proposal even sooner - but that patch is more best-effort.

Fixes #37906

Release note: A bug causing queries sent to a freshly-restarted node to
sometimes hang for a long time while the node catches up with
replication has been fixed.
  • Loading branch information
andreimatei committed Oct 14, 2020
1 parent 26054bd commit b31e727
Show file tree
Hide file tree
Showing 5 changed files with 272 additions and 14 deletions.
4 changes: 3 additions & 1 deletion pkg/kv/kvserver/client_raft_helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@ import (
)

type unreliableRaftHandlerFuncs struct {
// If non-nil, can return false to avoid dropping a msg to rangeID.
// If non-nil, can return false to avoid dropping the msg to
// unreliableRaftHandler.rangeID. If nil, all messages pertaining to the
// respective range are dropped.
dropReq func(*kvserver.RaftMessageRequest) bool
dropHB func(*kvserver.RaftHeartbeat) bool
dropResp func(*kvserver.RaftMessageResponse) bool
Expand Down
180 changes: 171 additions & 9 deletions pkg/kv/kvserver/client_raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -903,6 +903,7 @@ func TestSnapshotAfterTruncationWithUncommittedTail(t *testing.T) {
// x x
// [1]<---->[2]
//
log.Infof(ctx, "test: installing unreliable Raft transports")
for _, s := range []int{0, 1, 2} {
h := &unreliableRaftHandler{rangeID: 1, RaftMessageHandler: mtc.stores[s]}
if s != partStore {
Expand All @@ -922,6 +923,7 @@ func TestSnapshotAfterTruncationWithUncommittedTail(t *testing.T) {
// not succeed before their context is canceled, but they will be appended
// to the partitioned replica's Raft log because it is currently the Raft
// leader.
log.Infof(ctx, "test: sending writes to partitioned replica")
g := ctxgroup.WithContext(ctx)
for i := 0; i < 32; i++ {
otherKey := roachpb.Key(fmt.Sprintf("other-%d", i))
Expand All @@ -942,26 +944,45 @@ func TestSnapshotAfterTruncationWithUncommittedTail(t *testing.T) {
}

// Transfer the lease to one of the followers and perform a write. The
// partition ensures that this will require a Raft leadership change.
const newLeaderStore = partStore + 1
newLeaderRepl, err := mtc.stores[newLeaderStore].GetReplica(1)
if err != nil {
t.Fatal(err)
}
newLeaderReplSender := mtc.stores[newLeaderStore].TestSender()

// partition ensures that this will require a Raft leadership change. It's
// unpredictable which one of the followers will become leader. Only the
// leader will be allowed to acquire the lease (see
// TestSnapshotAfterTruncationWithUncommittedTail), so it's also unpredictable
// who will get the lease. We try repeatedly sending requests to both
// candidates until one of them succeeds.
var nonPartitionedSenders [2]kv.Sender
nonPartitionedSenders[0] = mtc.stores[1].TestSender()
nonPartitionedSenders[1] = mtc.stores[2].TestSender()

log.Infof(ctx, "test: sending write to transfer lease")
incArgs = incrementArgs(key, incB)
var i int
var newLeaderRepl *kvserver.Replica
var newLeaderReplSender kv.Sender
testutils.SucceedsSoon(t, func() error {
mtc.advanceClock(ctx)
_, pErr := kv.SendWrapped(ctx, newLeaderReplSender, incArgs)
i++
sender := nonPartitionedSenders[i%2]
_, pErr := kv.SendWrapped(ctx, sender, incArgs)
if _, ok := pErr.GetDetail().(*roachpb.NotLeaseHolderError); ok {
return pErr.GoError()
} else if pErr != nil {
t.Fatal(pErr)
}

// A request succeeded, proving that there is a new leader and leaseholder.
// Remember who that is.
newLeaderStoreIdx := 1 + (i % 2)
newLeaderRepl, err = mtc.stores[newLeaderStoreIdx].GetReplica(1)
if err != nil {
t.Fatal(err)
}
newLeaderReplSender = mtc.stores[newLeaderStoreIdx].TestSender()
return nil
})
log.Infof(ctx, "test: waiting for values...")
mtc.waitForValues(key, []int64{incA, incAB, incAB})
log.Infof(ctx, "test: waiting for values... done")

index, err := newLeaderRepl.GetLastIndex()
if err != nil {
Expand All @@ -970,6 +991,7 @@ func TestSnapshotAfterTruncationWithUncommittedTail(t *testing.T) {

// Truncate the log at index+1 (log entries < N are removed, so this
// includes the increment).
log.Infof(ctx, "test: truncating log")
truncArgs := truncateLogArgs(index+1, 1)
testutils.SucceedsSoon(t, func() error {
mtc.advanceClock(ctx)
Expand All @@ -986,6 +1008,7 @@ func TestSnapshotAfterTruncationWithUncommittedTail(t *testing.T) {
snapsBefore := snapsMetric.Count()

// Remove the partition. Snapshot should follow.
log.Infof(ctx, "test: removing the partition")
for _, s := range []int{0, 1, 2} {
mtc.transport.Listen(mtc.stores[s].Ident.StoreID, &unreliableRaftHandler{
rangeID: 1,
Expand Down Expand Up @@ -1024,6 +1047,145 @@ func TestSnapshotAfterTruncationWithUncommittedTail(t *testing.T) {
mtc.waitForValues(key, []int64{incABC, incABC, incABC})
}

// TestRequestsOnLaggingReplica tests that requests sent to a replica that's behind in
// log application don't block. The test indirectly verifies that a replica that's not
// the leader does not attempt to acquire a lease and, thus, does not block until
// it figures out that it cannot, in fact, take the lease.
// !!! more words
func TestRequestsOnLaggingReplica(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
ctx := context.Background()
mtc := &multiTestContext{
// This test only cares about a single range.
startWithSingleRange: true,
}
defer mtc.Stop()
mtc.Start(t, 3)

// Add replicas on all the stores.
mtc.replicateRange(1 /* rangeID */, 1, 2 /* dests */)

// Partition the original leader from its followers. We do this by installing
// unreliableRaftHandler listeners on all three Stores. The handler on the
// partitioned store filters out all messages while the handler on the other
// two stores only filters out messages from the partitioned store. The
// configuration looks like:
//
// [0]
// x x
// / \
// x x
// [1]<---->[2]
//
log.Infof(ctx, "test: installing unreliable Raft transports")
const partStoreIdx = 0
partStore := mtc.stores[partStoreIdx]
partRepl, err := partStore.GetReplica(1)
require.NoError(t, err)
partReplDesc, err := partRepl.GetReplicaDescriptor()
require.NoError(t, err)
partReplSender := partStore.TestSender()
const otherStoreIdx = 1
otherStore := mtc.stores[otherStoreIdx]
otherRepl, err := otherStore.GetReplica(1)
require.NoError(t, err)

for _, i := range []int{0, 1, 2} {
h := &unreliableRaftHandler{rangeID: 1, RaftMessageHandler: mtc.stores[i]}
if i != partStoreIdx {
// Only filter messages from the partitioned store on the other
// two stores.
h.dropReq = func(req *kvserver.RaftMessageRequest) bool {
return req.FromReplica.StoreID == partRepl.StoreID()
}
h.dropHB = func(hb *kvserver.RaftHeartbeat) bool {
return hb.FromReplicaID == partReplDesc.ReplicaID
}
}
mtc.transport.Listen(mtc.stores[i].Ident.StoreID, h)
}
// Wait until the leadership is transferred away from the partitioned replica.
testutils.SucceedsSoon(t, func() error {
mtc.advanceClock(ctx)
lead := otherRepl.RaftStatus().Lead
if lead == raft.None {
return errors.New("no leader yet")
}
if roachpb.ReplicaID(lead) == partReplDesc.ReplicaID {
return errors.New("partitioned replica is still leader")
}
return nil
})

leaderReplicaID := roachpb.ReplicaID(otherRepl.RaftStatus().Lead)
log.Infof(ctx, "test: the leader is replica ID %d", leaderReplicaID)
if leaderReplicaID != 2 && leaderReplicaID != 3 {
t.Fatalf("expected leader to be 1 or 2, was: %d", leaderReplicaID)
}
leaderSender := mtc.stores[leaderReplicaID-1].TestSender()

// Write something to generate some Raft log entries and then truncate the log.
log.Infof(ctx, "test: incrementing")
keyA := roachpb.Key("a")
incArgs := incrementArgs(keyA, 1)
_, pErr := kv.SendWrapped(ctx, leaderSender, incArgs)
require.Nil(t, pErr)
log.Infof(ctx, "test: waiting for values...")
mtc.waitForValues(keyA, []int64{0, 1, 1})
log.Infof(ctx, "test: waiting for values... done")
index, err := otherRepl.GetLastIndex()
require.NoError(t, err)

// Truncate the log at index+1 (log entries < N are removed, so this includes
// the increment). This means that the partitioned replica will need a
// snapshot to catch up.
log.Infof(ctx, "test: truncating log...")
truncArgs := truncateLogArgs(index+1, 1)
testutils.SucceedsSoon(t, func() error {
mtc.advanceClock(ctx)
_, pErr := kv.SendWrapped(ctx, leaderSender, truncArgs)
if _, ok := pErr.GetDetail().(*roachpb.NotLeaseHolderError); ok {
return pErr.GoError()
} else if pErr != nil {
t.Fatal(pErr)
}
return nil
})
log.Infof(ctx, "test: truncating log... done")

// !!! assert lease expired

// Resolve the partition, but continue blocking snapshots destined for the
// previously-partitioned replica. The point of blocking the snapshots is to
// prevent the respective replica from catching up and becoming eligible to
// becoming the leader/leaseholder. The point of resolving the partition is to
// allow the replica in question to figure out that it's not the leader any
// more. As long as it is completely partitioned, the replica continues
// believing that it is the leader, and lease acquisition requests block.
slowSnapHandler := &slowSnapRaftHandler{
rangeID: 1,
waitCh: make(chan struct{}),
RaftMessageHandler: partStore,
}
defer slowSnapHandler.unblock()
mtc.transport.Listen(partStore.Ident.StoreID, slowSnapHandler)

// Now we're going to send a request to the behind replica, and we expect it
// to not block; we expect a redirection to the leader.
log.Infof(ctx, "test: sending request")
timeoutCtx, cancel := context.WithTimeout(ctx, time.Second)
defer cancel()
getRequest := getArgs(keyA)
_, pErr = kv.SendWrapped(timeoutCtx, partReplSender, getRequest)
if pErr == nil {
t.Fatalf("unexpected success")
}
nlhe := pErr.Detail.GetNotLeaseHolder()
require.NotNil(t, nlhe, "expected NotLeaseholderError, got: %s", pErr)
require.Equal(t, leaderReplicaID, nlhe.LeaseHolder.ReplicaID)
}

type fakeSnapshotStream struct {
nextReq *kvserver.SnapshotRequest
nextErr error
Expand Down
11 changes: 7 additions & 4 deletions pkg/kv/kvserver/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -904,6 +904,8 @@ func (m *multiTestContext) addStore(idx int) {
nodeID := roachpb.NodeID(idx + 1)
cfg := m.makeStoreConfig(idx)
ambient := log.AmbientContext{Tracer: cfg.Settings.Tracer}
ambient.AddLogTag("n", nodeID)

m.populateDB(idx, cfg.Settings, stopper)
nlActive, nlRenewal := cfg.NodeLivenessDurations()
m.nodeLivenesses[idx] = kvserver.NewNodeLiveness(kvserver.NodeLivenessOptions{
Expand Down Expand Up @@ -1263,7 +1265,8 @@ func (m *multiTestContext) changeReplicas(
return desc.NextReplicaID, nil
}

// replicateRange replicates the given range onto the given stores.
// replicateRange replicates the given range onto the given destination stores. The destinations
// are indicated by indexes within m.stores.
func (m *multiTestContext) replicateRange(rangeID roachpb.RangeID, dests ...int) {
m.t.Helper()
if err := m.replicateRangeNonFatal(rangeID, dests...); err != nil {
Expand Down Expand Up @@ -1388,9 +1391,9 @@ func (m *multiTestContext) waitForValuesT(t testing.TB, key roachpb.Key, expecte
})
}

// waitForValues waits up to the given duration for the integer values
// at the given key to match the expected slice (across all engines).
// Fails the test if they do not match.
// waitForValues waits for the integer values at the given key to match the
// expected slice (across all engines). Fails the test if they do not match
// after the SucceedsSoon period.
func (m *multiTestContext) waitForValues(key roachpb.Key, expected []int64) {
m.t.Helper()
m.waitForValuesT(m.t, key, expected)
Expand Down
85 changes: 85 additions & 0 deletions pkg/kv/kvserver/replica_proposal_buf.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,18 @@ type proposer interface {
// The following require the proposer to hold an exclusive lock.
withGroupLocked(func(*raft.RawNode) error) error
registerProposalLocked(*ProposalData)
// rejectProposalWithRedirectLocked rejects a proposal and redirects the
// proposer to try it on another node. This is used to sometimes reject lease
// acquisitions when another replica is the leader; the intended consequence
// of the rejection is that the request that caused the lease acquisition
// attempt is tried on the leader, at which point it should result in a lease
// acquisition attempt by that node (or, perhaps by then the leader will have
// already gotten a lease and the request can be serviced directly).
rejectProposalWithRedirectLocked(
ctx context.Context,
prop *ProposalData,
redirectTo roachpb.ReplicaID,
)
}

// Init initializes the proposal buffer and binds it to the provided proposer.
Expand Down Expand Up @@ -399,6 +411,26 @@ func (b *propBuf) FlushLockedWithRaftGroup(
// and apply them.
buf := b.arr.asSlice()[:used]
ents := make([]raftpb.Entry, 0, used)

// Figure out leadership info. We'll use it to conditionally drop some
// requests.
var iAmTheLeader bool
var leaderKnown bool
var leader roachpb.ReplicaID
if raftGroup != nil {
status := raftGroup.BasicStatus()
iAmTheLeader = status.RaftState == raft.StateLeader
leaderKnown = status.Lead != raft.None
if leaderKnown {
leader = roachpb.ReplicaID(status.Lead)
if !iAmTheLeader && leader == b.p.replicaID() {
log.Fatalf(ctx,
"inconsistent Raft state: state %s while the current replica is also the lead: %d",
status.RaftState, leader)
}
}
}

// Remember the first error that we see when proposing the batch. We don't
// immediately return this error because we want to finish clearing out the
// buffer and registering each of the proposals with the proposer, but we
Expand All @@ -412,6 +444,41 @@ func (b *propBuf) FlushLockedWithRaftGroup(
}
buf[i] = nil // clear buffer

// Handle an edge case about lease acquisitions: we don't want to forward
// lease acquisitions to another node (which is what happens when we're not
// the leader) because:
// a) if there is a different leader, that leader should acquire the lease
// itself and thus avoid a change of leadership caused by the leaseholder
// and leader being different (Raft leadership follows the lease), and
// b) being a follower, it's possible that this replica is behind in
// applying the log. Thus, there might be another lease in place that this
// follower doesn't know about, in which case the lease we're proposing here
// would be rejected. Not only would proposing such a lease be wasted work,
// but we're trying to protect against pathological cases where it takes a
// long time for this follower to catch up (for example because it's waiting
// for a snapshot, and the snapshot is queued behind many other snapshots).
// In such a case, we don't want all requests arriving at this node to be
// blocked on this lease acquisition (which is very likely to eventually
// fail anyway).
//
// Thus, we do one of two things:
// - if the leader is known, we reject this proposal and make sure the
// request that needed the lease is redirected to the leaseholder;
// - if the leader is not known, we don't do anything special here to
// terminate the proposal, but we know that Raft will reject it with a
// ErrProposalDropped. We'll eventually re-propose it once a leader is
// known, at which point it will either go through or be rejected based on
// whether or not it is this replica that became the leader.
if !iAmTheLeader && p.Request.IsLeaseRequest() {
if leaderKnown {
log.VEventf(ctx, 2, "not proposing lease acquisition because we're not the leader; replica %d is",
leader)
b.p.rejectProposalWithRedirectLocked(ctx, p, leader)
continue
}
// If the leader is not known, continue with the proposal as explained above.
}

// Raft processing bookkeeping.
b.p.registerProposalLocked(p)

Expand Down Expand Up @@ -638,3 +705,21 @@ func (rp *replicaProposer) registerProposalLocked(p *ProposalData) {
p.proposedAtTicks = rp.mu.ticks
rp.mu.proposals[p.idKey] = p
}

// rejectProposalWithRedirectLocked is part of the proposer interface.
func (rp *replicaProposer) rejectProposalWithRedirectLocked(
ctx context.Context, prop *ProposalData, redirectTo roachpb.ReplicaID,
) {
r := (*Replica)(rp)
rangeDesc := r.descRLocked()
storeID := r.store.StoreID()
leaderRep, _ /* ok */ := rangeDesc.GetReplicaDescriptorByID(redirectTo)
speculativeLease := &roachpb.Lease{
Replica: leaderRep,
}
log.VEventf(ctx, 2, "redirecting proposal to node %s; request: %s", leaderRep.NodeID, prop.Request)
r.cleanupFailedProposalLocked(prop)
prop.finishApplication(ctx, proposalResult{
Err: roachpb.NewError(newNotLeaseHolderError(speculativeLease, storeID, rangeDesc)),
})
}
6 changes: 6 additions & 0 deletions pkg/kv/kvserver/replica_proposal_buf_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,12 @@ func (t *testProposer) registerProposalLocked(p *ProposalData) {
t.registered++
}

func (t *testProposer) rejectProposalWithRedirectLocked(
ctx context.Context, prop *ProposalData, redirectTo roachpb.ReplicaID,
) {
panic("unimplemented")
}

func newPropData(leaseReq bool) (*ProposalData, []byte) {
var ba roachpb.BatchRequest
if leaseReq {
Expand Down

0 comments on commit b31e727

Please sign in to comment.