Skip to content

Commit

Permalink
kv: unflake TestDelegateSnapshot
Browse files Browse the repository at this point in the history
Fixes: cockroachdb#96841
Fixes: cockroachdb#96525

Previously this test would assume that all snapshots came from the
sending of snapshots through the AdminChangeReplicasRequest which end up
as type OTHER. However occassionally we get a spurious raft snapshot
which makes this test flaky. This change ignores any raft snapshots that
are sent.

Epic: none
Release note: None
  • Loading branch information
andrewbaptist committed Apr 19, 2023
1 parent 5c78123 commit 89aa132
Showing 1 changed file with 62 additions and 53 deletions.
115 changes: 62 additions & 53 deletions pkg/kv/kvserver/replica_learner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"time"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb"
Expand Down Expand Up @@ -352,74 +353,82 @@ func TestAddReplicaWithReceiverThrottling(t *testing.T) {

// TestDelegateSnapshot verifies that the correct delegate is chosen when
// sending snapshots to stores.
// TODO: It is currently disabled because with raft snapshots sometimes being
// required and not going through snapshot delegation, the results are
// unpredictable.
func TestDelegateSnapshot(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
skip.WithIssue(t, 96841, "Occasionally fails until 87553 is resolved")

ctx := context.Background()

// Synchronize on the moment before the snapshot gets sent to measure the
// state at that time.
requestChannel := make(chan *kvserverpb.DelegateSendSnapshotRequest, 10)

setupFn := func(t *testing.T) (
*testcluster.TestCluster,
roachpb.Key,
) {
knobs, ltk := makeReplicationTestKnobs()
ltk.storeKnobs.DisableRaftSnapshotQueue = true

ltk.storeKnobs.SendSnapshot = func(request *kvserverpb.DelegateSendSnapshotRequest) {
// Record snapshots as they are sent on this channel for later analysis.
requestChannel := make(chan *kvserverpb.DelegateSendSnapshotRequest, 2)
knobs, ltk := makeReplicationTestKnobs()
ltk.storeKnobs.SendSnapshot = func(request *kvserverpb.DelegateSendSnapshotRequest) {
// TODO(abaptist): Remove this condition once 96841 is fixed. This
// accounts spurious raft snapshots that are sent. Also disable the raft
// snapshot queue using ltk.storeKnobs.DisableRaftSnapshotQueue = true.
if request.SenderQueueName != kvserverpb.SnapshotRequest_RAFT_SNAPSHOT_QUEUE {
requestChannel <- request
}
}

localityA := roachpb.Locality{Tiers: []roachpb.Tier{{Key: "region", Value: "a"}}}
localityB := roachpb.Locality{Tiers: []roachpb.Tier{{Key: "region", Value: "b"}}}

localityServerArgs := make(map[int]base.TestServerArgs)
localityServerArgs[0] = base.TestServerArgs{Knobs: knobs, Locality: localityA}
localityServerArgs[1] = base.TestServerArgs{Knobs: knobs, Locality: localityA}
localityServerArgs[2] = base.TestServerArgs{Knobs: knobs, Locality: localityB}
localityServerArgs[3] = base.TestServerArgs{Knobs: knobs, Locality: localityB}

tc := testcluster.StartTestCluster(
t, 4, base.TestClusterArgs{
ServerArgsPerNode: localityServerArgs,
ReplicationMode: base.ReplicationManual,
},
)
scratchKey := tc.ScratchRange(t)
return tc, scratchKey
localityA := roachpb.Locality{Tiers: []roachpb.Tier{{Key: "region", Value: "a"}}}
localityB := roachpb.Locality{Tiers: []roachpb.Tier{{Key: "region", Value: "b"}}}
localityServerArgs := map[int]base.TestServerArgs{
0: {Knobs: knobs, Locality: localityA},
1: {Knobs: knobs, Locality: localityA},
2: {Knobs: knobs, Locality: localityB},
3: {Knobs: knobs, Locality: localityB},
}

tc, scratchKey := setupFn(t)
defer tc.Stopper().Stop(ctx)
tc := testcluster.StartTestCluster(
t, 4, base.TestClusterArgs{
ServerArgsPerNode: localityServerArgs,
ReplicationMode: base.ReplicationManual,
},
)

// Node 3 (loc B) can only get the data from node 1 as its the only one that has it.
_ = tc.AddVotersOrFatal(t, scratchKey, tc.Targets(2)...)
request := <-requestChannel
require.Equal(t, request.DelegatedSender.StoreID, roachpb.StoreID(1))
// Drain the channel. Unfortunately there are occasionally spurious raft snapshots sent.
for len(requestChannel) > 0 {
<-requestChannel
scratchKey := tc.ScratchRange(t)
defer tc.Stopper().Stop(ctx)
// Node 3 (loc B) can only get the data from node 1 as it is the only replica.
{
_ = tc.AddVotersOrFatal(t, scratchKey, tc.Targets(2)...)
request := <-requestChannel
require.Equalf(t, request.DelegatedSender.StoreID, roachpb.StoreID(1), "Wrong sender for request %+v", request)
}

// Node 4 (loc B) should get the snapshot from node 3 as its the same locality.
_ = tc.AddVotersOrFatal(t, scratchKey, tc.Targets(3)...)
request = <-requestChannel
require.Equal(t, request.DelegatedSender.StoreID, roachpb.StoreID(3))
for len(requestChannel) > 0 {
<-requestChannel
// Node 4 (loc B) should get the delegated snapshot from node 3 which is the
// same locality.
{
leaderDesc := tc.AddVotersOrFatal(t, scratchKey, tc.Targets(3)...)
// Wait until we are sure the delegate store has received the descriptor. It
// can't delegate until it receives the latest generation descriptor.
testutils.SucceedsSoon(t, func() error {
var desc roachpb.RangeDescriptor
rKey := keys.MustAddr(scratchKey)
require.NoError(t, tc.Servers[2].DB().GetProto(ctx, keys.RangeDescriptorKey(rKey), &desc))
if desc.Generation != leaderDesc.Generation {
return errors.Newf("Generation mismatch %d != %d", desc.Generation, leaderDesc.Generation)
}
return nil
})
request := <-requestChannel
require.Equalf(t, request.DelegatedSender.StoreID, roachpb.StoreID(3), "Wrong type of request %+v", request)
// TODO(abaptist): Remove this loop. Sometimes the delegated request fails
// due to Raft updating the generation before we can delegate. Even with the
// loop above to get the delegate on the correct generation, this is racy if
// there is a raft snapshot. We fall back to not using our snapshot if the
// generation fails, but this means that a second request is sent from the
// leaseholder.
for len(requestChannel) > 0 {
<-requestChannel
}
}

// Node 2 (loc A) should get the snapshot from node 1 as it is the same locality.
_ = tc.AddVotersOrFatal(t, scratchKey, tc.Targets(1)...)
request = <-requestChannel
require.Equal(t, request.DelegatedSender.StoreID, roachpb.StoreID(1))
// Node 2 (loc A) should get the snapshot from node 1 as they have the same locality.
{
_ = tc.AddVotersOrFatal(t, scratchKey, tc.Targets(1)...)
request := <-requestChannel
require.Equalf(t, request.DelegatedSender.StoreID, roachpb.StoreID(1), "Wrong type of request %+v", request)
}
}

// TestDelegateSnapshotFails is a test that ensure we fail fast when the
Expand Down

0 comments on commit 89aa132

Please sign in to comment.