Skip to content

Commit

Permalink
kvserver: fail stale ConfChange when rejected by raft
Browse files Browse the repository at this point in the history
Because etcd/raft activates configuration changes when they are applied, but checks new proposed configs before they are considered for adding to the log (or forwarding to the leader), the following can happen:

- conf change 1 gets evaluated on a leaseholder n1
- lease changes
- new leaseholder evaluates and commits conf change 2
- n1 receives and applies conf change 2
- conf change 1 gets added to the proposal buffer and flushed; RawNode rejects
  it because conf change 1 is not compatible on top of conf change 2

Prior to this commit, because raft silently replaces the conf change with an
empty entry, we would never see the proposal below raft (where it would be
rejected due to the lease change). In effect, this caused replica unavailability
because the proposal and the associated latch would stick around forever, and the
replica circuit breaker would trip.

This commit provides a targeted fix: when the proposal buffer flushes a conf
change to raft, we check if it got replaced with an empty entry. If so, we
properly finish the proposal. To be conservative, we signal it with an ambiguous
result: it seems conceivable that the rejection would only occur on a
reproposal, while the original proposal made it into raft before the lease
change, and the local replica is in fact behind on conf changes rather than
ahead (which can happen if it's a follower). The only "customer" here is the
replicate queue (and scatter, etc) so this is acceptable; any choice here would
necessarily be a "hard error" anyway.

Epic: CRDB-25287
Release note (bug fix): under rare circumstances, a replication change could get
stuck when proposed near lease/leadership changes (and likely under overload),
and the replica circuit breakers could trip. This problem has been addressed.

Fixes #105797.
Closes #104709.
  • Loading branch information
tbg committed Jul 5, 2023
1 parent 4e050d0 commit 93117db
Show file tree
Hide file tree
Showing 3 changed files with 183 additions and 1 deletion.
61 changes: 61 additions & 0 deletions pkg/kv/kvserver/client_raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/storepool"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/isolation"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/kvflowdispatch"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb"
Expand Down Expand Up @@ -6933,3 +6934,63 @@ func TestStoreMetricsOnIncomingOutgoingMsg(t *testing.T) {
require.Equal(t, expected, actual)
})
}

// TestInvalidConfChangeRejection is a regression test for [1]. See also
// TestProposalBufferRejectStaleChangeReplicasConfChange for a unit test.
//
// [1]: https://github.com/cockroachdb/cockroach/issues/105797
func TestInvalidConfChangeRejection(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

// This is a regression test against a stuck command, so set a timeout to get
// a shot at a graceful failure on regression.
ctx, cancel := context.WithTimeout(context.Background(), testutils.DefaultSucceedsSoonDuration)
defer cancel()

tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{ReplicationMode: base.ReplicationManual})
defer tc.Stopper().Stop(ctx)

k := tc.ScratchRange(t)

repl := tc.GetFirstStoreFromServer(t, 0).LookupReplica(keys.MustAddr(k))

// Try to leave a joint config even though we're not in one. This is something
// that will lead raft to propose an empty entry instead of our conf change.
//
// See: https://github.com/cockroachdb/cockroach/issues/105797
var ba kvpb.BatchRequest
now := tc.Server(0).Clock().Now()
txn := roachpb.MakeTransaction("fake", k, isolation.Serializable, roachpb.NormalUserPriority, now, 500*time.Millisecond.Nanoseconds(), 1)
ba.Txn = &txn
ba.Timestamp = now
ba.Add(&kvpb.EndTxnRequest{
RequestHeader: kvpb.RequestHeader{
Key: k,
},
Commit: true,
InternalCommitTrigger: &roachpb.InternalCommitTrigger{
ChangeReplicasTrigger: &roachpb.ChangeReplicasTrigger{
Desc: repl.Desc(),
},
},
})

_, pErr := repl.Send(ctx, &ba)
require.ErrorContains(t, pErr.GoError(), `config change rejected by raft`)

// We didn't leak the latch.
_, err := tc.Servers[0].DB().Get(ctx, k)
require.NoError(t, err)

// Double check that we don't have a proposal in the map. (We may not have
// leaked the latch, but still leaked the proposal). This is morally always
// zero, but since it's a TestCluster guard against another random request
// blipping in.
testutils.SucceedsSoon(t, func() error {
if n := repl.State(ctx).NumPending; n > 0 {
return errors.Errorf("%d proposals pending", n)
}
return nil
})
}
27 changes: 27 additions & 0 deletions pkg/kv/kvserver/replica_proposal_buf.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,11 @@ type proposer interface {
lease *roachpb.Lease,
reason raftutil.ReplicaNeedsSnapshotStatus,
)
// rejectProposalWithErrLocked rejects the proposal with the given error. This
// should only be used if none of the other `rejectProposalX` methods apply.
rejectProposalWithErrLocked(
ctx context.Context, prop *ProposalData, pErr *kvpb.Error,
)

// leaseDebugRLocked returns info on the current lease.
leaseDebugRLocked() string
Expand Down Expand Up @@ -581,6 +586,28 @@ func (b *propBuf) FlushLockedWithRaftGroup(
firstErr = err
continue
}
if msg.Entries[0].Type == raftpb.EntryNormal {
// If we are trying to commit a ChangeReplicas but the lease has since
// changed, it's possible that the new leaseholder has already committed
// additional replication changes that our RawNode has applied. In that
// case, its config may be incompatible with what our stale
// ChangeReplicas wants to do, and raft will "reject" it by proposing an
// empty entry instead. We don't need this protection since it is
// conferred by our below-raft checks (plus, on the leaseholder, latches
// that linearize application of replication changes). In fact, it's
// detrimental, because an empty entry doesn't map back to our inflight
// proposal, and so in effect the stale ChangeReplicas will never "show
// up" as replicated, essentially leaking a proposal and the associated
// latches. We could disable the raft check, but there currently isn't
// an option for that. Instead, we detect when RawNode has replaced our
// entry with a "normal" entry and terminate the proposal here (note
// that it is currently not in the proposals map).
b.p.rejectProposalWithErrLocked(ctx, p, kvpb.NewErrorf(`config change rejected by raft; please retry`))
// At the time of writing, this is superfluous, but if any code gets
// added at the end of the loop likely it shouldn't affect this
// proposal since we're skipping it.
continue
}
} else {
// Add to the batch of entries that will soon be proposed. It is
// possible that this batching can cause the batched MsgProp to grow
Expand Down
96 changes: 95 additions & 1 deletion pkg/kv/kvserver/replica_proposal_buf_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"testing"
"time"

"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/closedts"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/closedts/tracker"
Expand All @@ -34,7 +35,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/errors"
Expand Down Expand Up @@ -65,6 +65,7 @@ type testProposer struct {
// If nil, rejectProposalWithLeaseTransferRejectedLocked() panics.
onRejectProposalWithLeaseTransferRejectedLocked func(
lease *roachpb.Lease, reason raftutil.ReplicaNeedsSnapshotStatus)
onRejectProposalWithErrLocked func(*ProposalData, *kvpb.Error)
// validLease is returned by ownsValidLease()
validLease bool
// leaderNotLive is returned from shouldCampaignOnRedirect().
Expand All @@ -91,6 +92,7 @@ type testProposerRaft struct {
// proposals are the commands that the propBuf flushed (i.e. passed to the
// Raft group) and have not yet been consumed with consumeProposals().
proposals []kvserverpb.RaftCommand
onProp func(raftpb.Message) // invoked on Step with MsgProp
campaigned bool
}

Expand All @@ -100,6 +102,9 @@ func (t *testProposerRaft) Step(msg raftpb.Message) error {
if msg.Type != raftpb.MsgProp {
return nil
}
if t.onProp != nil {
t.onProp(msg)
}
// Decode and save all the commands.
for _, e := range msg.Entries {
ent, err := raftlog.NewEntry(e)
Expand Down Expand Up @@ -260,6 +265,15 @@ func (t *testProposer) rejectProposalWithLeaseTransferRejectedLocked(
t.onRejectProposalWithLeaseTransferRejectedLocked(lease, reason)
}

func (t *testProposer) rejectProposalWithErrLocked(
_ context.Context, prop *ProposalData, pErr *kvpb.Error,
) {
if t.onRejectProposalWithErrLocked == nil {
panic("unexpected rejectProposalWithErrLocked() call")
}
t.onRejectProposalWithErrLocked(prop, pErr)
}

// proposalCreator holds on to a lease and creates proposals using it.
type proposalCreator struct {
lease kvserverpb.LeaseStatus
Expand Down Expand Up @@ -287,12 +301,19 @@ func (pc proposalCreator) newLeaseTransferProposal(lease roachpb.Lease) *Proposa
func (pc proposalCreator) newProposal(ba *kvpb.BatchRequest) *ProposalData {
var lease *roachpb.Lease
var isLeaseRequest bool
var cr *kvserverpb.ChangeReplicas
switch v := ba.Requests[0].GetInner().(type) {
case *kvpb.RequestLeaseRequest:
lease = &v.Lease
isLeaseRequest = true
case *kvpb.TransferLeaseRequest:
lease = &v.Lease
case *kvpb.EndTxnRequest:
if crt := v.InternalCommitTrigger.GetChangeReplicasTrigger(); crt != nil {
cr = &kvserverpb.ChangeReplicas{
ChangeReplicasTrigger: *crt,
}
}
}
p := &ProposalData{
ctx: context.Background(),
Expand All @@ -301,6 +322,7 @@ func (pc proposalCreator) newProposal(ba *kvpb.BatchRequest) *ProposalData {
ReplicatedEvalResult: kvserverpb.ReplicatedEvalResult{
IsLeaseRequest: isLeaseRequest,
State: &kvserverpb.ReplicaState{Lease: lease},
ChangeReplicas: cr,
},
},
Request: ba,
Expand Down Expand Up @@ -798,6 +820,78 @@ func TestProposalBufferRejectUnsafeLeaseTransfer(t *testing.T) {
}
}

// TestProposalBufferRejectStaleChangeReplicasConfChange is a regression test
// for [1]. See also TestInvalidConfChangeRejection for an end-to-end test.
//
// [1]: https://github.com/cockroachdb/cockroach/issues/105797
func TestProposalBufferRejectStaleChangeReplicasConfChange(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
ctx := context.Background()

proposer := uint64(1)
proposerFirstIndex := kvpb.RaftIndex(5)

var p testProposer
var pc proposalCreator
require.Equal(t, proposer, uint64(p.getReplicaID()))

var seenErr *kvpb.Error
p.onRejectProposalWithErrLocked = func(proposalData *ProposalData, pErr *kvpb.Error) {
require.NotNil(t, pErr)
seenErr = pErr
}

r := &testProposerRaft{
onProp: func(msg raftpb.Message) {
// Mimic what RawNode does when it gets a conf change that isn't
// compatible with its active config: proposing an empty entry instead. In
// practice, because the config is set when applying commands, this can
// happen when a stale ChangeReplicas is proposed but the RawNode has
// already applied newer config changes.
//
// See https://github.com/etcd-io/raft/blob/4abd9e927c6d5db930dfdb80237ac584449aeec7/raft.go#L1254-L1257.
if msg.Entries[0].Type == raftpb.EntryConfChangeV2 {
msg.Entries[0] = raftpb.Entry{Type: raftpb.EntryNormal}
}
},
}
p.raftGroup = r
p.fi = proposerFirstIndex

var b propBuf
clock := hlc.NewClockForTesting(nil)
tr := tracker.NewLockfreeTracker()
b.Init(&p, tr, clock, cluster.MakeTestingClusterSettings())

k := keys.LocalMax // unimportant
var ba kvpb.BatchRequest
ba.Add(&kvpb.EndTxnRequest{
RequestHeader: kvpb.RequestHeader{
Key: k,
},
Commit: true,
InternalCommitTrigger: &roachpb.InternalCommitTrigger{
ChangeReplicasTrigger: &roachpb.ChangeReplicasTrigger{
Desc: roachpb.NewRangeDescriptor(1, roachpb.RKeyMin, roachpb.RKeyMax,
roachpb.MakeReplicaSet([]roachpb.ReplicaDescriptor{{NodeID: 1, StoreID: 1, ReplicaID: 1}}),
),
},
},
})
pd := pc.newProposal(&ba)

_, tok := b.TrackEvaluatingRequest(ctx, hlc.MinTimestamp)
err := b.Insert(ctx, pd, tok.Move(ctx))
require.NoError(t, err)
require.NoError(t, b.flushLocked(ctx))
require.ErrorContains(t, seenErr.GoError(), `config change rejected by raft`)
// NB: we don't check that the proposals map is empty because the test harness
// currently doesn't do it (we'd really be testing the test harness only
// anyway). We have coverage for this end-to-end through
// TestInvalidConfChangeRejection, though.
}

// Test that the propBuf properly assigns closed timestamps to proposals being
// flushed out of it. Each subtest proposes one command and checks for the
// expected closed timestamp being written to the proposal by the propBuf.
Expand Down

0 comments on commit 93117db

Please sign in to comment.