Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kvserver: fix merge queue test failure due to race #100378

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 21 additions & 13 deletions pkg/kv/kvserver/replica_learner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1732,7 +1732,9 @@ func TestMergeQueueDoesNotInterruptReplicationChange(t *testing.T) {
defer log.Scope(t).Close(t)
ctx := context.Background()
var activateSnapshotTestingKnob int64
var snapshotStarted int64
blockSnapshot := make(chan struct{})
snapshotInProgress := make(chan struct{})
tc := testcluster.StartTestCluster(
t, 2, base.TestClusterArgs{
ReplicationMode: base.ReplicationManual,
Expand All @@ -1744,6 +1746,14 @@ func TestMergeQueueDoesNotInterruptReplicationChange(t *testing.T) {
DisableLoadBasedSplitting: true,
ReceiveSnapshot: func(_ *kvserverpb.SnapshotRequest_Header) error {
if atomic.LoadInt64(&activateSnapshotTestingKnob) == 1 {
// While the snapshot RPC should only happen once given
// that the cluster is running under manual replication,
// retries or other mechanisms can cause this to be called
// multiple times, so let's ensure we only close the channel
// snapshotInProgress once by using the snapshotStarted flag.
if atomic.CompareAndSwapInt64(&snapshotStarted, 0, 1) {
close(snapshotInProgress)
}
<-blockSnapshot
}
return nil
Expand Down Expand Up @@ -1772,7 +1782,7 @@ func TestMergeQueueDoesNotInterruptReplicationChange(t *testing.T) {
require.NoError(t, err)

select {
case <-time.After(100 * time.Millisecond):
case <-snapshotInProgress:
// Continue.
case <-replicationChange:
t.Fatal("did not expect the replication change to complete")
Expand All @@ -1787,18 +1797,16 @@ func TestMergeQueueDoesNotInterruptReplicationChange(t *testing.T) {
// TestCluster currently overrides this when used with ReplicationManual.
db.Exec(t, `SET CLUSTER SETTING kv.range_merge.queue_enabled = true`)

testutils.SucceedsSoon(t, func() error {
// While this replication change is stalled, we'll trigger a merge and
// ensure that the merge correctly notices that there is a snapshot in
// flight and ignores the range.
store, repl := getFirstStoreReplica(t, tc.Server(0), scratchKey)
_, processErr, enqueueErr := store.Enqueue(
ctx, "merge", repl, true /* skipShouldQueue */, false, /* async */
)
require.NoError(t, enqueueErr)
require.True(t, kvserver.IsReplicationChangeInProgressError(processErr))
return nil
})
// While this replication change is stalled, we'll trigger a merge and
// ensure that the merge correctly notices that there is a snapshot in
// flight and ignores the range.
store, repl := getFirstStoreReplica(t, tc.Server(0), scratchKey)
_, processErr, enqueueErr := store.Enqueue(
ctx, "merge", repl, true /* skipShouldQueue */, false, /* async */
)
require.NoError(t, enqueueErr)
require.Truef(t, kvserver.IsReplicationChangeInProgressError(processErr),
"expected replication change in progress error, got %+v", processErr)
}

func TestMergeQueueSeesLearnerOrJointConfig(t *testing.T) {
Expand Down