Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kvserver: remove TestReplicaRemovalClosesProposalQuota #98685

Merged
merged 2 commits into from
Mar 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
116 changes: 0 additions & 116 deletions pkg/kv/kvserver/client_raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5555,122 +5555,6 @@ func TestProcessSplitAfterRightHandSideHasBeenRemoved(t *testing.T) {
})
}

// TestReplicaRemovalClosesProposalQuota is a somewhat contrived test to ensure
// that when a replica is removed that it closes its proposal quota if it has
// one. This used to not be the case though it wasn't really very consequential.
// Firstly, it's rare that a removed replica has a proposal quota to begin with.
// Replicas which believe they are they leaseholder can only be removed if they
// have lost the lease and are behind. This requires a network partition.
// Regardless, there was never actually a problem because once the replica has
// been removed, all commands will eventually fail and remove themselves from
// the quota pool. This potentially adds latency as every pending request will
// need to acquire and release their quota. This is almost always very fast as
// it is rarely the case that there are more outstanding requests than there is
// quota. Nevertheless, we have this test to ensure that the pool does get
// closed if only to avoid asking the question and to ensure that that case is
// tested.
func TestReplicaRemovalClosesProposalQuota(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
ctx := context.Background()
skip.WithIssue(t, 98412)
// These variables track the request count to make sure that all of the
// requests have made it to the Replica.
var (
rangeID int64
putRequestCount int64
)
tc := testcluster.StartTestCluster(t, 3, base.TestClusterArgs{
ServerArgs: base.TestServerArgs{
Knobs: base.TestingKnobs{Store: &kvserver.StoreTestingKnobs{
DisableReplicaGCQueue: true,
TestingRequestFilter: func(_ context.Context, r *kvpb.BatchRequest) *kvpb.Error {
if r.RangeID == roachpb.RangeID(atomic.LoadInt64(&rangeID)) {
if _, isPut := r.GetArg(kvpb.Put); isPut {
atomic.AddInt64(&putRequestCount, 1)
}
}
return nil
},
}},
RaftConfig: base.RaftConfig{
// Set the proposal quota to a tiny amount so that each write will
// exceed it.
RaftProposalQuota: 512,
// RaftMaxInflightMsgs * RaftMaxSizePerMsg cannot exceed RaftProposalQuota.
RaftMaxInflightMsgs: 2,
RaftMaxSizePerMsg: 256,
},
},
ReplicationMode: base.ReplicationManual,
})
defer tc.Stopper().Stop(ctx)

key := tc.ScratchRange(t)
require.NoError(t, tc.WaitForSplitAndInitialization(key))
desc, err := tc.LookupRange(key)
require.NoError(t, err)
atomic.StoreInt64(&rangeID, int64(desc.RangeID))
tc.AddVotersOrFatal(t, key, tc.Target(1), tc.Target(2))
// Partition node 1 from receiving any requests or responses.
// This will prevent it from successfully replicating anything.
require.NoError(t, tc.WaitForSplitAndInitialization(key))
require.NoError(t, tc.TransferRangeLease(desc, tc.Target(0)))
store, repl := getFirstStoreReplica(t, tc.Server(0), key)
funcs := unreliableRaftHandlerFuncs{}
tc.Servers[0].RaftTransport().Listen(store.StoreID(), &unreliableRaftHandler{
rangeID: desc.RangeID,
RaftMessageHandler: store,
unreliableRaftHandlerFuncs: funcs,
})
// NB: We need to be sure that our Replica is the leaseholder for this
// test to make sense. It usually is.
lease, pendingLease := repl.GetLease()
if pendingLease != (roachpb.Lease{}) || lease.OwnedBy(store.StoreID()) {
skip.IgnoreLint(t, "the replica is not the leaseholder, this happens rarely under stressrace")
}
var wg sync.WaitGroup
const N = 100
for i := 0; i < N; i++ {
wg.Add(1)
go func(i int) {
defer wg.Done()
k := append(key[0:len(key):len(key)], strconv.Itoa(i)...)
_, pErr := kv.SendWrappedWith(ctx, store.TestSender(), kvpb.Header{
RangeID: desc.RangeID,
}, putArgs(k, bytes.Repeat([]byte{'a'}, 1000)))
require.Regexp(t,
`result is ambiguous \(replica removed\)|`+
`r`+strconv.Itoa(int(desc.RangeID))+" was not found on s1", pErr.GoError())
}(i)
}
testutils.SucceedsSoon(t, func() error {
if seen := atomic.LoadInt64(&putRequestCount); seen < N {
return fmt.Errorf("saw %d, waiting for %d", seen, N)
}
return nil
})
desc = *repl.Desc()
fromReplDesc, found := desc.GetReplicaDescriptor(3)
require.True(t, found)
replDesc, found := desc.GetReplicaDescriptor(store.StoreID())
require.True(t, found)
newReplDesc := replDesc
newReplDesc.ReplicaID = desc.NextReplicaID
require.Nil(t, store.HandleRaftRequest(ctx, &kvserverpb.RaftMessageRequest{
RangeID: desc.RangeID,
RangeStartKey: desc.StartKey,
FromReplica: fromReplDesc,
ToReplica: newReplDesc,
Message: raftpb.Message{Type: raftpb.MsgVote, Term: 2},
}, noopRaftMessageResponseStream{}))
ts := waitForTombstone(t, store.TODOEngine(), desc.RangeID)
require.Equal(t, ts.NextReplicaID, desc.NextReplicaID)
wg.Wait()
_, err = repl.GetProposalQuota().Acquire(ctx, 1)
require.Regexp(t, "closed.*destroyed", err)
}

type noopRaftMessageResponseStream struct{}

func (n noopRaftMessageResponseStream) Send(*kvserverpb.RaftMessageResponse) error {
Expand Down
11 changes: 11 additions & 0 deletions pkg/kv/kvserver/replica_raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,8 @@ func (r *Replica) evalAndPropose(
// If the request requested that Raft consensus be performed asynchronously,
// return a proposal result immediately on the proposal's done channel.
// The channel's capacity will be large enough to accommodate this.
maybeFinishSpan := func() {}
defer func() { maybeFinishSpan() }() // NB: late binding is important
if ba.AsyncConsensus {
if ets := proposal.Local.DetachEndTxns(false /* alwaysOnly */); len(ets) != 0 {
// Disallow async consensus for commands with EndTxnIntents because
Expand All @@ -200,6 +202,12 @@ func (r *Replica) evalAndPropose(
// Fork the proposal's context span so that the proposal's context
// can outlive the original proposer's context.
proposal.ctx, proposal.sp = tracing.ForkSpan(ctx, "async consensus")
if proposal.sp != nil {
// We can't leak this span if we fail to hand the proposal to the
// replication layer, so finish it later in this method if we are to
// return with an error. (On success, we'll reset this to a noop).
maybeFinishSpan = proposal.sp.Finish
tbg marked this conversation as resolved.
Show resolved Hide resolved
}

// Signal the proposal's response channel immediately.
reply := *proposal.Local.Reply
Expand Down Expand Up @@ -292,6 +300,9 @@ func (r *Replica) evalAndPropose(
if pErr != nil {
return nil, nil, "", nil, pErr
}
// We've successfully handed the proposal to the replication layer, so this
// method should not finish the trace span if we forked one off above.
maybeFinishSpan = func() {}
// Abandoning a proposal unbinds its context so that the proposal's client
// is free to terminate execution. However, it does nothing to try to
// prevent the command from succeeding. In particular, endCmds will still be
Expand Down