diff --git a/pkg/kv/kvserver/client_raft_test.go b/pkg/kv/kvserver/client_raft_test.go index 16a7f213a587..3f0e115f3cf6 100644 --- a/pkg/kv/kvserver/client_raft_test.go +++ b/pkg/kv/kvserver/client_raft_test.go @@ -31,6 +31,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/storepool" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/isolation" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/kvflowdispatch" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb" @@ -6933,3 +6934,63 @@ func TestStoreMetricsOnIncomingOutgoingMsg(t *testing.T) { require.Equal(t, expected, actual) }) } + +// TestInvalidConfChangeRejection is a regression test for [1]. See also +// TestProposalBufferRejectStaleChangeReplicasConfChange for a unit test. +// +// [1]: https://github.com/cockroachdb/cockroach/issues/105797 +func TestInvalidConfChangeRejection(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + // This is a regression test against a stuck command, so set a timeout to get + // a shot at a graceful failure on regression. + ctx, cancel := context.WithTimeout(context.Background(), testutils.DefaultSucceedsSoonDuration) + defer cancel() + + tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{ReplicationMode: base.ReplicationManual}) + defer tc.Stopper().Stop(ctx) + + k := tc.ScratchRange(t) + + repl := tc.GetFirstStoreFromServer(t, 0).LookupReplica(keys.MustAddr(k)) + + // Try to leave a joint config even though we're not in one. This is something + // that will lead raft to propose an empty entry instead of our conf change. + // + // See: https://github.com/cockroachdb/cockroach/issues/105797 + var ba kvpb.BatchRequest + now := tc.Server(0).Clock().Now() + txn := roachpb.MakeTransaction("fake", k, isolation.Serializable, roachpb.NormalUserPriority, now, 500*time.Millisecond.Nanoseconds(), 1) + ba.Txn = &txn + ba.Timestamp = now + ba.Add(&kvpb.EndTxnRequest{ + RequestHeader: kvpb.RequestHeader{ + Key: k, + }, + Commit: true, + InternalCommitTrigger: &roachpb.InternalCommitTrigger{ + ChangeReplicasTrigger: &roachpb.ChangeReplicasTrigger{ + Desc: repl.Desc(), + }, + }, + }) + + _, pErr := repl.Send(ctx, &ba) + require.ErrorContains(t, pErr.GoError(), `config change rejected by raft`) + + // We didn't leak the latch. + _, err := tc.Servers[0].DB().Get(ctx, k) + require.NoError(t, err) + + // Double check that we don't have a proposal in the map. (We may not have + // leaked the latch, but still leaked the proposal). This is morally always + // zero, but since it's a TestCluster guard against another random request + // blipping in. + testutils.SucceedsSoon(t, func() error { + if n := repl.State(ctx).NumPending; n > 0 { + return errors.Errorf("%d proposals pending", n) + } + return nil + }) +} diff --git a/pkg/kv/kvserver/replica_proposal_buf.go b/pkg/kv/kvserver/replica_proposal_buf.go index cc770dcd1d62..efaf0991a078 100644 --- a/pkg/kv/kvserver/replica_proposal_buf.go +++ b/pkg/kv/kvserver/replica_proposal_buf.go @@ -192,6 +192,11 @@ type proposer interface { lease *roachpb.Lease, reason raftutil.ReplicaNeedsSnapshotStatus, ) + // rejectProposalWithErrLocked rejects the proposal with the given error. This + // should only be used if none of the other `rejectProposalX` methods apply. + rejectProposalWithErrLocked( + ctx context.Context, prop *ProposalData, pErr *kvpb.Error, + ) // leaseDebugRLocked returns info on the current lease. leaseDebugRLocked() string @@ -581,6 +586,28 @@ func (b *propBuf) FlushLockedWithRaftGroup( firstErr = err continue } + if msg.Entries[0].Type == raftpb.EntryNormal { + // If we are trying to commit a ChangeReplicas but the lease has since + // changed, it's possible that the new leaseholder has already committed + // additional replication changes that our RawNode has applied. In that + // case, its config may be incompatible with what our stale + // ChangeReplicas wants to do, and raft will "reject" it by proposing an + // empty entry instead. We don't need this protection since it is + // conferred by our below-raft checks (plus, on the leaseholder, latches + // that linearize application of replication changes). In fact, it's + // detrimental, because an empty entry doesn't map back to our inflight + // proposal, and so in effect the stale ChangeReplicas will never "show + // up" as replicated, essentially leaking a proposal and the associated + // latches. We could disable the raft check, but there currently isn't + // an option for that. Instead, we detect when RawNode has replaced our + // entry with a "normal" entry and terminate the proposal here (note + // that it is currently not in the proposals map). + b.p.rejectProposalWithErrLocked(ctx, p, kvpb.NewErrorf(`config change rejected by raft; please retry`)) + // At the time of writing, this is superfluous, but if any code gets + // added at the end of the loop likely it shouldn't affect this + // proposal since we're skipping it. + continue + } } else { // Add to the batch of entries that will soon be proposed. It is // possible that this batching can cause the batched MsgProp to grow diff --git a/pkg/kv/kvserver/replica_proposal_buf_test.go b/pkg/kv/kvserver/replica_proposal_buf_test.go index a80d6588cfd8..8b8268a12f52 100644 --- a/pkg/kv/kvserver/replica_proposal_buf_test.go +++ b/pkg/kv/kvserver/replica_proposal_buf_test.go @@ -18,6 +18,7 @@ import ( "testing" "time" + "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv/kvpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/closedts" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/closedts/tracker" @@ -34,7 +35,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" - "github.com/cockroachdb/cockroach/pkg/util/protoutil" "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/errors" @@ -65,6 +65,7 @@ type testProposer struct { // If nil, rejectProposalWithLeaseTransferRejectedLocked() panics. onRejectProposalWithLeaseTransferRejectedLocked func( lease *roachpb.Lease, reason raftutil.ReplicaNeedsSnapshotStatus) + onRejectProposalWithErrLocked func(*ProposalData, *kvpb.Error) // validLease is returned by ownsValidLease() validLease bool // leaderNotLive is returned from shouldCampaignOnRedirect(). @@ -91,6 +92,7 @@ type testProposerRaft struct { // proposals are the commands that the propBuf flushed (i.e. passed to the // Raft group) and have not yet been consumed with consumeProposals(). proposals []kvserverpb.RaftCommand + onProp func(raftpb.Message) // invoked on Step with MsgProp campaigned bool } @@ -100,6 +102,9 @@ func (t *testProposerRaft) Step(msg raftpb.Message) error { if msg.Type != raftpb.MsgProp { return nil } + if t.onProp != nil { + t.onProp(msg) + } // Decode and save all the commands. for _, e := range msg.Entries { ent, err := raftlog.NewEntry(e) @@ -260,6 +265,15 @@ func (t *testProposer) rejectProposalWithLeaseTransferRejectedLocked( t.onRejectProposalWithLeaseTransferRejectedLocked(lease, reason) } +func (t *testProposer) rejectProposalWithErrLocked( + _ context.Context, prop *ProposalData, pErr *kvpb.Error, +) { + if t.onRejectProposalWithErrLocked == nil { + panic("unexpected rejectProposalWithErrLocked() call") + } + t.onRejectProposalWithErrLocked(prop, pErr) +} + // proposalCreator holds on to a lease and creates proposals using it. type proposalCreator struct { lease kvserverpb.LeaseStatus @@ -287,12 +301,19 @@ func (pc proposalCreator) newLeaseTransferProposal(lease roachpb.Lease) *Proposa func (pc proposalCreator) newProposal(ba *kvpb.BatchRequest) *ProposalData { var lease *roachpb.Lease var isLeaseRequest bool + var cr *kvserverpb.ChangeReplicas switch v := ba.Requests[0].GetInner().(type) { case *kvpb.RequestLeaseRequest: lease = &v.Lease isLeaseRequest = true case *kvpb.TransferLeaseRequest: lease = &v.Lease + case *kvpb.EndTxnRequest: + if crt := v.InternalCommitTrigger.GetChangeReplicasTrigger(); crt != nil { + cr = &kvserverpb.ChangeReplicas{ + ChangeReplicasTrigger: *crt, + } + } } p := &ProposalData{ ctx: context.Background(), @@ -301,6 +322,7 @@ func (pc proposalCreator) newProposal(ba *kvpb.BatchRequest) *ProposalData { ReplicatedEvalResult: kvserverpb.ReplicatedEvalResult{ IsLeaseRequest: isLeaseRequest, State: &kvserverpb.ReplicaState{Lease: lease}, + ChangeReplicas: cr, }, }, Request: ba, @@ -798,6 +820,78 @@ func TestProposalBufferRejectUnsafeLeaseTransfer(t *testing.T) { } } +// TestProposalBufferRejectStaleChangeReplicasConfChange is a regression test +// for [1]. See also TestInvalidConfChangeRejection for an end-to-end test. +// +// [1]: https://github.com/cockroachdb/cockroach/issues/105797 +func TestProposalBufferRejectStaleChangeReplicasConfChange(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + ctx := context.Background() + + proposer := uint64(1) + proposerFirstIndex := kvpb.RaftIndex(5) + + var p testProposer + var pc proposalCreator + require.Equal(t, proposer, uint64(p.getReplicaID())) + + var seenErr *kvpb.Error + p.onRejectProposalWithErrLocked = func(proposalData *ProposalData, pErr *kvpb.Error) { + require.NotNil(t, pErr) + seenErr = pErr + } + + r := &testProposerRaft{ + onProp: func(msg raftpb.Message) { + // Mimic what RawNode does when it gets a conf change that isn't + // compatible with its active config: proposing an empty entry instead. In + // practice, because the config is set when applying commands, this can + // happen when a stale ChangeReplicas is proposed but the RawNode has + // already applied newer config changes. + // + // See https://github.com/etcd-io/raft/blob/4abd9e927c6d5db930dfdb80237ac584449aeec7/raft.go#L1254-L1257. + if msg.Entries[0].Type == raftpb.EntryConfChangeV2 { + msg.Entries[0] = raftpb.Entry{Type: raftpb.EntryNormal} + } + }, + } + p.raftGroup = r + p.fi = proposerFirstIndex + + var b propBuf + clock := hlc.NewClockForTesting(nil) + tr := tracker.NewLockfreeTracker() + b.Init(&p, tr, clock, cluster.MakeTestingClusterSettings()) + + k := keys.LocalMax // unimportant + var ba kvpb.BatchRequest + ba.Add(&kvpb.EndTxnRequest{ + RequestHeader: kvpb.RequestHeader{ + Key: k, + }, + Commit: true, + InternalCommitTrigger: &roachpb.InternalCommitTrigger{ + ChangeReplicasTrigger: &roachpb.ChangeReplicasTrigger{ + Desc: roachpb.NewRangeDescriptor(1, roachpb.RKeyMin, roachpb.RKeyMax, + roachpb.MakeReplicaSet([]roachpb.ReplicaDescriptor{{NodeID: 1, StoreID: 1, ReplicaID: 1}}), + ), + }, + }, + }) + pd := pc.newProposal(&ba) + + _, tok := b.TrackEvaluatingRequest(ctx, hlc.MinTimestamp) + err := b.Insert(ctx, pd, tok.Move(ctx)) + require.NoError(t, err) + require.NoError(t, b.flushLocked(ctx)) + require.ErrorContains(t, seenErr.GoError(), `config change rejected by raft`) + // NB: we don't check that the proposals map is empty because the test harness + // currently doesn't do it (we'd really be testing the test harness only + // anyway). We have coverage for this end-to-end through + // TestInvalidConfChangeRejection, though. +} + // Test that the propBuf properly assigns closed timestamps to proposals being // flushed out of it. Each subtest proposes one command and checks for the // expected closed timestamp being written to the proposal by the propBuf.