Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

release-23.1: kvserver: fail stale ConfChange when rejected by raft #106147

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 61 additions & 0 deletions pkg/kv/kvserver/client_raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/isolation"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness/livenesspb"
Expand Down Expand Up @@ -5976,3 +5977,63 @@ func TestRaftSnapshotsWithMVCCRangeKeysEverywhere(t *testing.T) {
require.Equal(t, kvpb.CheckConsistencyResponse_RANGE_CONSISTENT, result.Status, "%+v", result)
}
}

// TestInvalidConfChangeRejection is a regression test for [1]. See also
// TestProposalBufferRejectStaleChangeReplicasConfChange for a unit test.
//
// [1]: https://github.com/cockroachdb/cockroach/issues/105797
func TestInvalidConfChangeRejection(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

// This is a regression test against a stuck command, so set a timeout to get
// a shot at a graceful failure on regression.
ctx, cancel := context.WithTimeout(context.Background(), testutils.DefaultSucceedsSoonDuration)
defer cancel()

tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{ReplicationMode: base.ReplicationManual})
defer tc.Stopper().Stop(ctx)

k := tc.ScratchRange(t)

repl := tc.GetFirstStoreFromServer(t, 0).LookupReplica(keys.MustAddr(k))

// Try to leave a joint config even though we're not in one. This is something
// that will lead raft to propose an empty entry instead of our conf change.
//
// See: https://github.com/cockroachdb/cockroach/issues/105797
var ba kvpb.BatchRequest
now := tc.Server(0).Clock().Now()
txn := roachpb.MakeTransaction("fake", k, isolation.Serializable, roachpb.NormalUserPriority, now, 500*time.Millisecond.Nanoseconds(), 1)
ba.Txn = &txn
ba.Timestamp = now
ba.Add(&kvpb.EndTxnRequest{
RequestHeader: kvpb.RequestHeader{
Key: k,
},
Commit: true,
InternalCommitTrigger: &roachpb.InternalCommitTrigger{
ChangeReplicasTrigger: &roachpb.ChangeReplicasTrigger{
Desc: repl.Desc(),
},
},
})

_, pErr := repl.Send(ctx, &ba)
require.ErrorContains(t, pErr.GoError(), `config change rejected by raft`)

// We didn't leak the latch.
_, err := tc.Servers[0].DB().Get(ctx, k)
require.NoError(t, err)

// Double check that we don't have a proposal in the map. (We may not have
// leaked the latch, but still leaked the proposal). This is morally always
// zero, but since it's a TestCluster guard against another random request
// blipping in.
testutils.SucceedsSoon(t, func() error {
if n := repl.State(ctx).NumPending; n > 0 {
return errors.Errorf("%d proposals pending", n)
}
return nil
})
}
43 changes: 39 additions & 4 deletions pkg/kv/kvserver/replica_proposal_buf.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,11 @@ type proposer interface {
lease *roachpb.Lease,
reason raftutil.ReplicaNeedsSnapshotStatus,
)
// rejectProposalWithErrLocked rejects the proposal with the given error. This
// should only be used if none of the other `rejectProposalX` methods apply.
rejectProposalWithErrLocked(
ctx context.Context, prop *ProposalData, pErr *kvpb.Error,
)

// leaseDebugRLocked returns info on the current lease.
leaseDebugRLocked() string
Expand All @@ -185,7 +190,6 @@ type proposerRaft interface {
Step(raftpb.Message) error
Status() raft.Status
BasicStatus() raft.BasicStatus
ProposeConfChange(raftpb.ConfChangeI) error
Campaign() error
}

Expand Down Expand Up @@ -512,16 +516,47 @@ func (b *propBuf) FlushLockedWithRaftGroup(
continue
}

if err := raftGroup.ProposeConfChange(
cc,
); err != nil && !errors.Is(err, raft.ErrProposalDropped) {
typ, data, err := raftpb.MarshalConfChange(cc)
if err != nil {
firstErr = err
continue
}
msg := raftpb.Message{Type: raftpb.MsgProp, Entries: []raftpb.Entry{
{
Type: typ,
Data: data,
},
}}
if err := raftGroup.Step(msg); err != nil && !errors.Is(err, raft.ErrProposalDropped) {
// Silently ignore dropped proposals (they were always silently
// ignored prior to the introduction of ErrProposalDropped).
// TODO(bdarnell): Handle ErrProposalDropped better.
// https://github.com/cockroachdb/cockroach/issues/21849
firstErr = err
continue
}
if msg.Entries[0].Type == raftpb.EntryNormal {
// If we are trying to commit a ChangeReplicas but the lease has since
// changed, it's possible that the new leaseholder has already committed
// additional replication changes that our RawNode has applied. In that
// case, its config may be incompatible with what our stale
// ChangeReplicas wants to do, and raft will "reject" it by proposing an
// empty entry instead. We don't need this protection since it is
// conferred by our below-raft checks (plus, on the leaseholder, latches
// that linearize application of replication changes). In fact, it's
// detrimental, because an empty entry doesn't map back to our inflight
// proposal, and so in effect the stale ChangeReplicas will never "show
// up" as replicated, essentially leaking a proposal and the associated
// latches. We could disable the raft check, but there currently isn't
// an option for that. Instead, we detect when RawNode has replaced our
// entry with a "normal" entry and terminate the proposal here (note
// that it is currently not in the proposals map).
b.p.rejectProposalWithErrLocked(ctx, p, kvpb.NewErrorf(`config change rejected by raft; please retry`))
// At the time of writing, this is superfluous, but if any code gets
// added at the end of the loop likely it shouldn't affect this
// proposal since we're skipping it.
continue
}
} else {
// Add to the batch of entries that will soon be proposed. It is
// possible that this batching can cause the batched MsgProp to grow
Expand Down
111 changes: 98 additions & 13 deletions pkg/kv/kvserver/replica_proposal_buf_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"testing"
"time"

"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/closedts"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/closedts/tracker"
Expand All @@ -30,7 +31,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/errors"
Expand Down Expand Up @@ -61,6 +61,7 @@ type testProposer struct {
// If nil, rejectProposalWithLeaseTransferRejectedLocked() panics.
onRejectProposalWithLeaseTransferRejectedLocked func(
lease *roachpb.Lease, reason raftutil.ReplicaNeedsSnapshotStatus)
onRejectProposalWithErrLocked func(*ProposalData, *kvpb.Error)
// validLease is returned by ownsValidLease()
validLease bool
// leaderNotLive is returned from shouldCampaignOnRedirect().
Expand All @@ -87,6 +88,7 @@ type testProposerRaft struct {
// proposals are the commands that the propBuf flushed (i.e. passed to the
// Raft group) and have not yet been consumed with consumeProposals().
proposals []kvserverpb.RaftCommand
onProp func(raftpb.Message) // invoked on Step with MsgProp
campaigned bool
}

Expand All @@ -96,6 +98,9 @@ func (t *testProposerRaft) Step(msg raftpb.Message) error {
if msg.Type != raftpb.MsgProp {
return nil
}
if t.onProp != nil {
t.onProp(msg)
}
// Decode and save all the commands.
for _, e := range msg.Entries {
ent, err := raftlog.NewEntry(e)
Expand All @@ -122,11 +127,6 @@ func (t testProposerRaft) BasicStatus() raft.BasicStatus {
return t.status.BasicStatus
}

func (t testProposerRaft) ProposeConfChange(i raftpb.ConfChangeI) error {
// TODO(andrei, nvanbenschoten): Capture the message and test against it.
return nil
}

func (t *testProposerRaft) Campaign() error {
t.campaigned = true
return nil
Expand Down Expand Up @@ -250,6 +250,15 @@ func (t *testProposer) rejectProposalWithLeaseTransferRejectedLocked(
t.onRejectProposalWithLeaseTransferRejectedLocked(lease, reason)
}

func (t *testProposer) rejectProposalWithErrLocked(
_ context.Context, prop *ProposalData, pErr *kvpb.Error,
) {
if t.onRejectProposalWithErrLocked == nil {
panic("unexpected rejectProposalWithErrLocked() call")
}
t.onRejectProposalWithErrLocked(prop, pErr)
}

// proposalCreator holds on to a lease and creates proposals using it.
type proposalCreator struct {
lease kvserverpb.LeaseStatus
Expand Down Expand Up @@ -277,12 +286,19 @@ func (pc proposalCreator) newLeaseTransferProposal(lease roachpb.Lease) *Proposa
func (pc proposalCreator) newProposal(ba *kvpb.BatchRequest) *ProposalData {
var lease *roachpb.Lease
var isLeaseRequest bool
var cr *kvserverpb.ChangeReplicas
switch v := ba.Requests[0].GetInner().(type) {
case *kvpb.RequestLeaseRequest:
lease = &v.Lease
isLeaseRequest = true
case *kvpb.TransferLeaseRequest:
lease = &v.Lease
case *kvpb.EndTxnRequest:
if crt := v.InternalCommitTrigger.GetChangeReplicasTrigger(); crt != nil {
cr = &kvserverpb.ChangeReplicas{
ChangeReplicasTrigger: *crt,
}
}
}
p := &ProposalData{
ctx: context.Background(),
Expand All @@ -291,6 +307,7 @@ func (pc proposalCreator) newProposal(ba *kvpb.BatchRequest) *ProposalData {
ReplicatedEvalResult: kvserverpb.ReplicatedEvalResult{
IsLeaseRequest: isLeaseRequest,
State: &kvserverpb.ReplicaState{Lease: lease},
ChangeReplicas: cr,
},
},
Request: ba,
Expand All @@ -301,15 +318,11 @@ func (pc proposalCreator) newProposal(ba *kvpb.BatchRequest) *ProposalData {
}

func (pc proposalCreator) encodeProposal(p *ProposalData) []byte {
cmdLen := p.command.Size()
needed := raftlog.RaftCommandPrefixLen + cmdLen + kvserverpb.MaxRaftCommandFooterSize()
data := make([]byte, raftlog.RaftCommandPrefixLen, needed)
raftlog.EncodeRaftCommandPrefix(data, raftlog.EntryEncodingStandardWithoutAC, p.idKey)
data = data[:raftlog.RaftCommandPrefixLen+p.command.Size()]
if _, err := protoutil.MarshalTo(p.command, data[raftlog.RaftCommandPrefixLen:]); err != nil {
b, err := raftlog.EncodeCommand(context.Background(), p.command, p.idKey, nil /* raftAdmissionMeta */)
if err != nil {
panic(err)
}
return data
return b
}

// TestProposalBuffer tests the basic behavior of the Raft proposal buffer.
Expand Down Expand Up @@ -792,6 +805,78 @@ func TestProposalBufferRejectUnsafeLeaseTransfer(t *testing.T) {
}
}

// TestProposalBufferRejectStaleChangeReplicasConfChange is a regression test
// for [1]. See also TestInvalidConfChangeRejection for an end-to-end test.
//
// [1]: https://github.com/cockroachdb/cockroach/issues/105797
func TestProposalBufferRejectStaleChangeReplicasConfChange(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
ctx := context.Background()

proposer := uint64(1)
proposerFirstIndex := kvpb.RaftIndex(5)

var p testProposer
var pc proposalCreator
require.Equal(t, proposer, uint64(p.getReplicaID()))

var seenErr *kvpb.Error
p.onRejectProposalWithErrLocked = func(proposalData *ProposalData, pErr *kvpb.Error) {
require.NotNil(t, pErr)
seenErr = pErr
}

r := &testProposerRaft{
onProp: func(msg raftpb.Message) {
// Mimic what RawNode does when it gets a conf change that isn't
// compatible with its active config: proposing an empty entry instead. In
// practice, because the config is set when applying commands, this can
// happen when a stale ChangeReplicas is proposed but the RawNode has
// already applied newer config changes.
//
// See https://github.com/etcd-io/raft/blob/4abd9e927c6d5db930dfdb80237ac584449aeec7/raft.go#L1254-L1257.
if msg.Entries[0].Type == raftpb.EntryConfChangeV2 {
msg.Entries[0] = raftpb.Entry{Type: raftpb.EntryNormal}
}
},
}
p.raftGroup = r
p.fi = proposerFirstIndex

var b propBuf
clock := hlc.NewClockForTesting(nil)
tr := tracker.NewLockfreeTracker()
b.Init(&p, tr, clock, cluster.MakeTestingClusterSettings())

k := keys.LocalMax // unimportant
var ba kvpb.BatchRequest
ba.Add(&kvpb.EndTxnRequest{
RequestHeader: kvpb.RequestHeader{
Key: k,
},
Commit: true,
InternalCommitTrigger: &roachpb.InternalCommitTrigger{
ChangeReplicasTrigger: &roachpb.ChangeReplicasTrigger{
Desc: roachpb.NewRangeDescriptor(1, roachpb.RKeyMin, roachpb.RKeyMax,
roachpb.MakeReplicaSet([]roachpb.ReplicaDescriptor{{NodeID: 1, StoreID: 1, ReplicaID: 1}}),
),
},
},
})
pd := pc.newProposal(&ba)

_, tok := b.TrackEvaluatingRequest(ctx, hlc.MinTimestamp)
err := b.Insert(ctx, pd, tok.Move(ctx))
require.NoError(t, err)
require.NoError(t, b.flushLocked(ctx))
require.ErrorContains(t, seenErr.GoError(), `config change rejected by raft`)
// NB: we don't check that the proposals map is empty because the test harness
// currently doesn't do it (we'd really be testing the test harness only
// anyway). We have coverage for this end-to-end through
// TestInvalidConfChangeRejection, though.
}

// Test that the propBuf properly assigns closed timestamps to proposals being
// flushed out of it. Each subtest proposes one command and checks for the
// expected closed timestamp being written to the proposal by the propBuf.
Expand Down