diff --git a/pkg/kv/kvserver/client_raft_test.go b/pkg/kv/kvserver/client_raft_test.go index 1e0f96a3f24b..1a5f67194065 100644 --- a/pkg/kv/kvserver/client_raft_test.go +++ b/pkg/kv/kvserver/client_raft_test.go @@ -5555,122 +5555,6 @@ func TestProcessSplitAfterRightHandSideHasBeenRemoved(t *testing.T) { }) } -// TestReplicaRemovalClosesProposalQuota is a somewhat contrived test to ensure -// that when a replica is removed that it closes its proposal quota if it has -// one. This used to not be the case though it wasn't really very consequential. -// Firstly, it's rare that a removed replica has a proposal quota to begin with. -// Replicas which believe they are they leaseholder can only be removed if they -// have lost the lease and are behind. This requires a network partition. -// Regardless, there was never actually a problem because once the replica has -// been removed, all commands will eventually fail and remove themselves from -// the quota pool. This potentially adds latency as every pending request will -// need to acquire and release their quota. This is almost always very fast as -// it is rarely the case that there are more outstanding requests than there is -// quota. Nevertheless, we have this test to ensure that the pool does get -// closed if only to avoid asking the question and to ensure that that case is -// tested. -func TestReplicaRemovalClosesProposalQuota(t *testing.T) { - defer leaktest.AfterTest(t)() - defer log.Scope(t).Close(t) - ctx := context.Background() - skip.WithIssue(t, 98412) - // These variables track the request count to make sure that all of the - // requests have made it to the Replica. - var ( - rangeID int64 - putRequestCount int64 - ) - tc := testcluster.StartTestCluster(t, 3, base.TestClusterArgs{ - ServerArgs: base.TestServerArgs{ - Knobs: base.TestingKnobs{Store: &kvserver.StoreTestingKnobs{ - DisableReplicaGCQueue: true, - TestingRequestFilter: func(_ context.Context, r *kvpb.BatchRequest) *kvpb.Error { - if r.RangeID == roachpb.RangeID(atomic.LoadInt64(&rangeID)) { - if _, isPut := r.GetArg(kvpb.Put); isPut { - atomic.AddInt64(&putRequestCount, 1) - } - } - return nil - }, - }}, - RaftConfig: base.RaftConfig{ - // Set the proposal quota to a tiny amount so that each write will - // exceed it. - RaftProposalQuota: 512, - // RaftMaxInflightMsgs * RaftMaxSizePerMsg cannot exceed RaftProposalQuota. - RaftMaxInflightMsgs: 2, - RaftMaxSizePerMsg: 256, - }, - }, - ReplicationMode: base.ReplicationManual, - }) - defer tc.Stopper().Stop(ctx) - - key := tc.ScratchRange(t) - require.NoError(t, tc.WaitForSplitAndInitialization(key)) - desc, err := tc.LookupRange(key) - require.NoError(t, err) - atomic.StoreInt64(&rangeID, int64(desc.RangeID)) - tc.AddVotersOrFatal(t, key, tc.Target(1), tc.Target(2)) - // Partition node 1 from receiving any requests or responses. - // This will prevent it from successfully replicating anything. - require.NoError(t, tc.WaitForSplitAndInitialization(key)) - require.NoError(t, tc.TransferRangeLease(desc, tc.Target(0))) - store, repl := getFirstStoreReplica(t, tc.Server(0), key) - funcs := unreliableRaftHandlerFuncs{} - tc.Servers[0].RaftTransport().Listen(store.StoreID(), &unreliableRaftHandler{ - rangeID: desc.RangeID, - RaftMessageHandler: store, - unreliableRaftHandlerFuncs: funcs, - }) - // NB: We need to be sure that our Replica is the leaseholder for this - // test to make sense. It usually is. - lease, pendingLease := repl.GetLease() - if pendingLease != (roachpb.Lease{}) || lease.OwnedBy(store.StoreID()) { - skip.IgnoreLint(t, "the replica is not the leaseholder, this happens rarely under stressrace") - } - var wg sync.WaitGroup - const N = 100 - for i := 0; i < N; i++ { - wg.Add(1) - go func(i int) { - defer wg.Done() - k := append(key[0:len(key):len(key)], strconv.Itoa(i)...) - _, pErr := kv.SendWrappedWith(ctx, store.TestSender(), kvpb.Header{ - RangeID: desc.RangeID, - }, putArgs(k, bytes.Repeat([]byte{'a'}, 1000))) - require.Regexp(t, - `result is ambiguous \(replica removed\)|`+ - `r`+strconv.Itoa(int(desc.RangeID))+" was not found on s1", pErr.GoError()) - }(i) - } - testutils.SucceedsSoon(t, func() error { - if seen := atomic.LoadInt64(&putRequestCount); seen < N { - return fmt.Errorf("saw %d, waiting for %d", seen, N) - } - return nil - }) - desc = *repl.Desc() - fromReplDesc, found := desc.GetReplicaDescriptor(3) - require.True(t, found) - replDesc, found := desc.GetReplicaDescriptor(store.StoreID()) - require.True(t, found) - newReplDesc := replDesc - newReplDesc.ReplicaID = desc.NextReplicaID - require.Nil(t, store.HandleRaftRequest(ctx, &kvserverpb.RaftMessageRequest{ - RangeID: desc.RangeID, - RangeStartKey: desc.StartKey, - FromReplica: fromReplDesc, - ToReplica: newReplDesc, - Message: raftpb.Message{Type: raftpb.MsgVote, Term: 2}, - }, noopRaftMessageResponseStream{})) - ts := waitForTombstone(t, store.TODOEngine(), desc.RangeID) - require.Equal(t, ts.NextReplicaID, desc.NextReplicaID) - wg.Wait() - _, err = repl.GetProposalQuota().Acquire(ctx, 1) - require.Regexp(t, "closed.*destroyed", err) -} - type noopRaftMessageResponseStream struct{} func (n noopRaftMessageResponseStream) Send(*kvserverpb.RaftMessageResponse) error { diff --git a/pkg/kv/kvserver/replica_raft.go b/pkg/kv/kvserver/replica_raft.go index f67d5a989501..2c60ecb1cd63 100644 --- a/pkg/kv/kvserver/replica_raft.go +++ b/pkg/kv/kvserver/replica_raft.go @@ -188,6 +188,8 @@ func (r *Replica) evalAndPropose( // If the request requested that Raft consensus be performed asynchronously, // return a proposal result immediately on the proposal's done channel. // The channel's capacity will be large enough to accommodate this. + maybeFinishSpan := func() {} + defer func() { maybeFinishSpan() }() // NB: late binding is important if ba.AsyncConsensus { if ets := proposal.Local.DetachEndTxns(false /* alwaysOnly */); len(ets) != 0 { // Disallow async consensus for commands with EndTxnIntents because @@ -200,6 +202,12 @@ func (r *Replica) evalAndPropose( // Fork the proposal's context span so that the proposal's context // can outlive the original proposer's context. proposal.ctx, proposal.sp = tracing.ForkSpan(ctx, "async consensus") + if proposal.sp != nil { + // We can't leak this span if we fail to hand the proposal to the + // replication layer, so finish it later in this method if we are to + // return with an error. (On success, we'll reset this to a noop). + maybeFinishSpan = proposal.sp.Finish + } // Signal the proposal's response channel immediately. reply := *proposal.Local.Reply @@ -292,6 +300,9 @@ func (r *Replica) evalAndPropose( if pErr != nil { return nil, nil, "", nil, pErr } + // We've successfully handed the proposal to the replication layer, so this + // method should not finish the trace span if we forked one off above. + maybeFinishSpan = func() {} // Abandoning a proposal unbinds its context so that the proposal's client // is free to terminate execution. However, it does nothing to try to // prevent the command from succeeding. In particular, endCmds will still be