Skip to content

Commit

Permalink
Merge #98685
Browse files Browse the repository at this point in the history
98685: kvserver: remove TestReplicaRemovalClosesProposalQuota r=pavelkalinnikov a=tbg

As this test explained in its comment, the circumstances it was
seeking to test were very rarely met and even if the behavior
were missing, this wouldn't cause any problems.

The test has been skipped (accidentally) for a long time, was flaky even
then (leaking trace spans, see below), and it's not easy to
enact the situation the test wishes to construct in an idiomatic way.

It's pretty clear that the lines the test wants are present:

https://github.com/cockroachdb/cockroach/blob/736a67e0d36cc545bf74d65db069ee895ff9bea0/pkg/kv/kvserver/replica_destroy.go#L174-L176

It's better to remove the test at this point.

Closes #96932.

Also, fix a buglet that the test "accidentally" uncovered:

Previously, if `evalAndPropose` returned with an error while trying to
propose a pipelined write, it would leak the trace span.

Make sure this doesn't happen. The test that exercised this got deleted
in the previous commit, but this is still a bug that could cause a
larger leak if a condition were ever added to `evalAndPropose` which
could cause a large amount of errors (perhaps spread out over a longer
time period) in production clusters.

Unfortunately, writing a test for this is likely to be a net negative;
if the reviewer feels strongly I can add a testing knob to inject an
error in the right place in this method and exercise it that way.

Touches #96932.

Epic: none
Release note: None

Co-authored-by: Tobias Grieger <[email protected]>
  • Loading branch information
craig[bot] and tbg committed Mar 16, 2023
2 parents b2c9fa6 + 2a36ef3 commit b6978e9
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 116 deletions.
116 changes: 0 additions & 116 deletions pkg/kv/kvserver/client_raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5552,122 +5552,6 @@ func TestProcessSplitAfterRightHandSideHasBeenRemoved(t *testing.T) {
})
}

// TestReplicaRemovalClosesProposalQuota is a somewhat contrived test to ensure
// that when a replica is removed that it closes its proposal quota if it has
// one. This used to not be the case though it wasn't really very consequential.
// Firstly, it's rare that a removed replica has a proposal quota to begin with.
// Replicas which believe they are they leaseholder can only be removed if they
// have lost the lease and are behind. This requires a network partition.
// Regardless, there was never actually a problem because once the replica has
// been removed, all commands will eventually fail and remove themselves from
// the quota pool. This potentially adds latency as every pending request will
// need to acquire and release their quota. This is almost always very fast as
// it is rarely the case that there are more outstanding requests than there is
// quota. Nevertheless, we have this test to ensure that the pool does get
// closed if only to avoid asking the question and to ensure that that case is
// tested.
func TestReplicaRemovalClosesProposalQuota(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
ctx := context.Background()
skip.WithIssue(t, 98412)
// These variables track the request count to make sure that all of the
// requests have made it to the Replica.
var (
rangeID int64
putRequestCount int64
)
tc := testcluster.StartTestCluster(t, 3, base.TestClusterArgs{
ServerArgs: base.TestServerArgs{
Knobs: base.TestingKnobs{Store: &kvserver.StoreTestingKnobs{
DisableReplicaGCQueue: true,
TestingRequestFilter: func(_ context.Context, r *kvpb.BatchRequest) *kvpb.Error {
if r.RangeID == roachpb.RangeID(atomic.LoadInt64(&rangeID)) {
if _, isPut := r.GetArg(kvpb.Put); isPut {
atomic.AddInt64(&putRequestCount, 1)
}
}
return nil
},
}},
RaftConfig: base.RaftConfig{
// Set the proposal quota to a tiny amount so that each write will
// exceed it.
RaftProposalQuota: 512,
// RaftMaxInflightMsgs * RaftMaxSizePerMsg cannot exceed RaftProposalQuota.
RaftMaxInflightMsgs: 2,
RaftMaxSizePerMsg: 256,
},
},
ReplicationMode: base.ReplicationManual,
})
defer tc.Stopper().Stop(ctx)

key := tc.ScratchRange(t)
require.NoError(t, tc.WaitForSplitAndInitialization(key))
desc, err := tc.LookupRange(key)
require.NoError(t, err)
atomic.StoreInt64(&rangeID, int64(desc.RangeID))
tc.AddVotersOrFatal(t, key, tc.Target(1), tc.Target(2))
// Partition node 1 from receiving any requests or responses.
// This will prevent it from successfully replicating anything.
require.NoError(t, tc.WaitForSplitAndInitialization(key))
require.NoError(t, tc.TransferRangeLease(desc, tc.Target(0)))
store, repl := getFirstStoreReplica(t, tc.Server(0), key)
funcs := unreliableRaftHandlerFuncs{}
tc.Servers[0].RaftTransport().Listen(store.StoreID(), &unreliableRaftHandler{
rangeID: desc.RangeID,
RaftMessageHandler: store,
unreliableRaftHandlerFuncs: funcs,
})
// NB: We need to be sure that our Replica is the leaseholder for this
// test to make sense. It usually is.
lease, pendingLease := repl.GetLease()
if pendingLease != (roachpb.Lease{}) || lease.OwnedBy(store.StoreID()) {
skip.IgnoreLint(t, "the replica is not the leaseholder, this happens rarely under stressrace")
}
var wg sync.WaitGroup
const N = 100
for i := 0; i < N; i++ {
wg.Add(1)
go func(i int) {
defer wg.Done()
k := append(key[0:len(key):len(key)], strconv.Itoa(i)...)
_, pErr := kv.SendWrappedWith(ctx, store.TestSender(), kvpb.Header{
RangeID: desc.RangeID,
}, putArgs(k, bytes.Repeat([]byte{'a'}, 1000)))
require.Regexp(t,
`result is ambiguous \(replica removed\)|`+
`r`+strconv.Itoa(int(desc.RangeID))+" was not found on s1", pErr.GoError())
}(i)
}
testutils.SucceedsSoon(t, func() error {
if seen := atomic.LoadInt64(&putRequestCount); seen < N {
return fmt.Errorf("saw %d, waiting for %d", seen, N)
}
return nil
})
desc = *repl.Desc()
fromReplDesc, found := desc.GetReplicaDescriptor(3)
require.True(t, found)
replDesc, found := desc.GetReplicaDescriptor(store.StoreID())
require.True(t, found)
newReplDesc := replDesc
newReplDesc.ReplicaID = desc.NextReplicaID
require.Nil(t, store.HandleRaftRequest(ctx, &kvserverpb.RaftMessageRequest{
RangeID: desc.RangeID,
RangeStartKey: desc.StartKey,
FromReplica: fromReplDesc,
ToReplica: newReplDesc,
Message: raftpb.Message{Type: raftpb.MsgVote, Term: 2},
}, noopRaftMessageResponseStream{}))
ts := waitForTombstone(t, store.TODOEngine(), desc.RangeID)
require.Equal(t, ts.NextReplicaID, desc.NextReplicaID)
wg.Wait()
_, err = repl.GetProposalQuota().Acquire(ctx, 1)
require.Regexp(t, "closed.*destroyed", err)
}

type noopRaftMessageResponseStream struct{}

func (n noopRaftMessageResponseStream) Send(*kvserverpb.RaftMessageResponse) error {
Expand Down
11 changes: 11 additions & 0 deletions pkg/kv/kvserver/replica_raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,8 @@ func (r *Replica) evalAndPropose(
// If the request requested that Raft consensus be performed asynchronously,
// return a proposal result immediately on the proposal's done channel.
// The channel's capacity will be large enough to accommodate this.
maybeFinishSpan := func() {}
defer func() { maybeFinishSpan() }() // NB: late binding is important
if ba.AsyncConsensus {
if ets := proposal.Local.DetachEndTxns(false /* alwaysOnly */); len(ets) != 0 {
// Disallow async consensus for commands with EndTxnIntents because
Expand All @@ -200,6 +202,12 @@ func (r *Replica) evalAndPropose(
// Fork the proposal's context span so that the proposal's context
// can outlive the original proposer's context.
proposal.ctx, proposal.sp = tracing.ForkSpan(ctx, "async consensus")
if proposal.sp != nil {
// We can't leak this span if we fail to hand the proposal to the
// replication layer, so finish it later in this method if we are to
// return with an error. (On success, we'll reset this to a noop).
maybeFinishSpan = proposal.sp.Finish
}

// Signal the proposal's response channel immediately.
reply := *proposal.Local.Reply
Expand Down Expand Up @@ -292,6 +300,9 @@ func (r *Replica) evalAndPropose(
if pErr != nil {
return nil, nil, "", nil, pErr
}
// We've successfully handed the proposal to the replication layer, so this
// method should not finish the trace span if we forked one off above.
maybeFinishSpan = func() {}
// Abandoning a proposal unbinds its context so that the proposal's client
// is free to terminate execution. However, it does nothing to try to
// prevent the command from succeeding. In particular, endCmds will still be
Expand Down

0 comments on commit b6978e9

Please sign in to comment.